c# Threadpool - limit number of threads

59,636

Solution 1

I would use Parallel.For and set MaxDegreeOfParallelism accordingly.

Parallel.For(0, 1000, new ParallelOptions { MaxDegreeOfParallelism = 10 },
  i =>
  {
    GetPage(pageList[i]);
  });

Solution 2

Just inverse that code from:

ThreadPool.SetMaxThreads(1, 0);
ThreadPool.SetMinThreads(1, 0);

To:

ThreadPool.SetMinThreads(1, 0);
ThreadPool.SetMaxThreads(1, 0);

You can't set the MaxThread smaller than MinThread

Solution 3

Personally, I would use SmartThreadPool and leave the ThreadPool alone. However, this is probably what you want: C# thread pool limiting threads

Included code from link (please give the original author credit, not me)

System.Threading.Semaphore S = new System.Threading.Semaphore(3, 3);  
try 
{     
  // wait your turn (decrement)     
  S.WaitOne();     
  // do your thing 
}  
finally 
{     
  // release so others can go (increment)     
  S.Release(); 
} 

Solution 4

Description

You can do this using the ThreadPool.SetMaxThreads method.

But there are some problems using the ThreadPool for WebRequest. Read, for example, this (Bug in ThreadPool or HttpWebRequest?)

Sample

ThreadPool.SetMaxThreads(2,2);

Edit

Personally i would use AsParallel from Linq, for this.

More Information

Solution 5

If you are tighten to .Net 2.0 you can use the following technique:

knowing the fact that whey you enqueue a task to the ThreadPool it will create a new thread (of course if there're no free ones), you'll wait before doing this until there's a free thread. For this purpose the BlockingCounter class is used (described below) that once the limit is reached will wait to increment until someone (another thread) decrements it. Then it's entered "closed" state indicating that no new Increments will be done and waits for completion.

Below is the sample that shows a maximum of 4 tasks with the total number of 10.

class Program
{

    static int s_numCurrentThreads = 0;

    static Random s_rnd = new Random();

    static void Main(string[] args)
    {

        int maxParallelTasks = 4;
        int totalTasks = 10;

        using (BlockingCounter blockingCounter = new BlockingCounter(maxParallelTasks))
        {
            for (int i = 1; i <= totalTasks; i++)
            {

                Console.WriteLine("Submitting task {0}", i);
                blockingCounter.WaitableIncrement();
                if (!ThreadPool.QueueUserWorkItem((obj) =>
                                                      {
                                                          try
                                                          {
                                                              ThreadProc(obj);
                                                          }
                                                          catch (Exception ex)
                                                          {
                                                              Console.Error.WriteLine("Task {0} failed: {1}", obj, ex.Message);
                                                          }
                                                          finally
                                                          {
                                                              // Exceptions are possible here too, 
                                                              // but proper error handling is not the goal of this sample
                                                              blockingCounter.WaitableDecrement();
                                                          }
                                                      }, i))
                {
                    blockingCounter.WaitableDecrement();
                    Console.Error.WriteLine("Failed to submit task {0} for execution.", i);
                }
            }

            Console.WriteLine("Waiting for copmletion...");
            blockingCounter.CloseAndWait(30000);
        }

        Console.WriteLine("Work done!");
        Console.ReadKey();

    }

    static void ThreadProc (object obj)
    {
        int taskNumber = (int) obj;
        int numThreads = Interlocked.Increment(ref s_numCurrentThreads);

        Console.WriteLine("Task {0} started. Total: {1}", taskNumber, numThreads);
        int sleepTime = s_rnd.Next(0, 5);
        Thread.Sleep(sleepTime * 1000);
        Console.WriteLine("Task {0} finished.", taskNumber);

        Interlocked.Decrement(ref s_numCurrentThreads);
    }

It uses the BlockingCounter class that is based on the Marc Gravell's SizeQueue posted here, but without a counter instead of a queue. When you end queueing new threads call Close() method and then wait for it to finish.

public class BlockingCounter : IDisposable
{
    private int m_Count;
    private object m_counterLock = new object();

    private bool m_isClosed = false;
    private volatile bool m_isDisposed = false;

    private int m_MaxSize = 0;

    private ManualResetEvent m_Finished = new ManualResetEvent(false);

    public BlockingCounter(int maxSize = 0)
    {
        if (maxSize < 0)
            throw new ArgumentOutOfRangeException("maxSize");
        m_MaxSize = maxSize;
    }


    public void WaitableIncrement(int timeoutMs = Timeout.Infinite)
    {
        lock (m_counterLock)
        {
            while (m_MaxSize > 0 && m_Count >= m_MaxSize)
            {
                CheckClosedOrDisposed();
                if (!Monitor.Wait(m_counterLock, timeoutMs))
                    throw new TimeoutException("Failed to wait for counter to decrement.");
            }

            CheckClosedOrDisposed();
            m_Count++;

            if (m_Count == 1)
            {
                Monitor.PulseAll(m_counterLock);
            }

        }
    }

    public void WaitableDecrement(int timeoutMs = Timeout.Infinite)
    {
        lock (m_counterLock)
        {
            try
            {
                while (m_Count == 0)
                {
                    CheckClosedOrDisposed();
                    if (!Monitor.Wait(m_counterLock, timeoutMs))
                        throw new TimeoutException("Failed to wait for counter to increment.");
                }

                CheckDisposed();

                m_Count--;

                if (m_MaxSize == 0 || m_Count == m_MaxSize - 1)
                    Monitor.PulseAll(m_counterLock);
            }
            finally
            {
                if (m_isClosed && m_Count == 0)
                    m_Finished.Set();
            }
        }
    }

    void CheckClosedOrDisposed()
    {
        if (m_isClosed)
            throw new Exception("The counter is closed");
        CheckDisposed();
    }

    void CheckDisposed()
    {
        if (m_isDisposed)
            throw new ObjectDisposedException("The counter has been disposed.");
    }

    public void Close()
    {
        lock (m_counterLock)
        {
            CheckDisposed();
            m_isClosed = true;
            Monitor.PulseAll(m_counterLock);
        }
    }

    public bool WaitForFinish(int timeoutMs = Timeout.Infinite)
    {
        CheckDisposed();
        lock (m_counterLock)
        { 
             if (m_Count == 0)
                 return true;
        }
        return m_Finished.WaitOne(timeoutMs);
    }

    public void CloseAndWait (int timeoutMs = Timeout.Infinite)
    {
        Close();
        WaitForFinish(timeoutMs);
    }

    public void Dispose()
    {
        if (!m_isDisposed)
        {
            m_isDisposed = true;
            lock (m_counterLock)
            {
                // Wake up all waiting threads, so that they know the object 
                // is disposed and there's nothing to wait anymore
                Monitor.PulseAll(m_counterLock);
            }
            m_Finished.Close();
        }
    }
}

The result will be like that:

Submitting task 1
Submitting task 2
Submitting task 3
Submitting task 4
Submitting task 5
Task 1 started. Total: 1
Task 1 finished.
Task 3 started. Total: 1
Submitting task 6
Task 2 started. Total: 2
Task 3 finished.
Task 6 started. Total: 4
Task 5 started. Total: 3
Task 4 started. Total: 4
Submitting task 7
Task 4 finished.
Submitting task 8
Task 7 started. Total: 4
Task 5 finished.
Submitting task 9
Task 7 finished.
Task 8 started. Total: 4
Task 9 started. Total: 4
Submitting task 10
Task 2 finished.
Waiting for copmletion...
Task 10 started. Total: 4
Task 10 finished.
Task 6 finished.
Task 8 finished.
Task 9 finished.
Work done!
Share:
59,636

Related videos on Youtube

Bryan
Author by

Bryan

Updated on November 11, 2020

Comments

  • Bryan
    Bryan over 3 years

    I am developing a console app.

    I want to use a Threadpool to perform web downloads. Here is some fake code.

     for (int loop=0; loop< 100; loop++)
     {
         ThreadPool.QueueUserWorkItem(new WaitCallback(GetPage), pageList[loop]);
     }
    
    
    snip
    
    private static void GetPage(object o)
    {
        //get the page
    }
    

    How do I prevent my code from starting more than two (or ten, or whatever) simultaneous threads?

    I have tried

        ThreadPool.SetMaxThreads(1, 0);
        ThreadPool.SetMinThreads(1, 0);
    

    But they seem to have no impact.

  • Erik Funkenbusch
    Erik Funkenbusch about 12 years
    You need to be careful, however. Threadpool worker threads are often used for other purposes. For example, in a web application, ThreadPool workers process web requests.
  • Chris Gessler
    Chris Gessler about 12 years
    FYI... this limits the total number of threads in the IIS ThreadPool, so basically users won't be able to access your site. This code should live on a separate website with it's own threadpool.
  • Bryan
    Bryan about 12 years
    Added more detail to question
  • Alexsandro
    Alexsandro almost 8 years
    Can I have the same effect with Dataflow? Do you have a tip?
  • Jack Miller
    Jack Miller over 7 years
    This is the answer for the actual question: How do I prevent my code from starting...
  • AaA
    AaA almost 4 years
    I believe what you have reported as bug and its thread is not related to this question. The way he is using the HttpWebRequest is causing the issue with his implementation.