Windows API Thread Pool simple example

13,430

You're roughly right up to the last point.

The whole idea about a thread pool is that you don't care how many threads it has. You just throw a lot of work into the thread pool, and let the OS determine how to execute each chunk. So, if you create and submit 10 chunks, the OS may use between 1 and 10 threads from the pool.

You should not care about those thread identities. Don't bother with thread ID's, minimum or maximum number of threads, or stuff like that.

If you don't care about thread identities, then how do you manage what part of the vector to change? Simple. Before creating the threadpool, initialize a counter to zero. In the callback function, call InterlockedIncrement to retrieve and increment the counter. For each submitted work item, you'll get a consecutive integer.

Share:
13,430
nbonneel
Author by

nbonneel

PostDoc at Harvard in computer graphics.

Updated on June 04, 2022

Comments

  • nbonneel
    nbonneel almost 2 years

    [EDIT: thanks to MSalters answer and Raymond Chen's answer to InterlockedIncrement vs EnterCriticalSection/counter++/LeaveCriticalSection, the problem is solved and the code below is working properly. This should provide an interesting simple example of Thread Pool use in Windows]

    I don't manage to find a simple example of the following task. My program, for example, needs to increment the values in a huge std::vector by one, so I want to do that in parallel. It needs to do that a bunch of times across the lifetime of the program. I know how to do that using CreateThread at each call of the routine but I don't manage to get rid of the CreateThread with the ThreadPool.

    Here is what I do :

    class Thread {
    public:
        Thread(){}
        virtual void run() = 0 ; // I can inherit an "IncrementVectorThread"
    };
    class IncrementVectorThread: public Thread {
    public:
       IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { };
    
       virtual void run() {
            for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++)
              myvec[i]++; //and let's assume myvec is properly sized
        }
       int id, nb;
       std::vector<int> &myvec;
    };
    
    class ThreadGroup : public std::vector<Thread*> {
    public:
        ThreadGroup() { 
             pool = CreateThreadpool(NULL);
             InitializeThreadpoolEnvironment(&cbe);
             cleanupGroup = CreateThreadpoolCleanupGroup();
             SetThreadpoolCallbackPool(&cbe, pool);
             SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
             threadCount = 0;
        }
        ~ThreadGroup() {
             CloseThreadpool(pool);
    }
        PTP_POOL pool;
        TP_CALLBACK_ENVIRON cbe;
        PTP_CLEANUP_GROUP cleanupGroup;
        volatile long threadCount;
    } ;
    
    
    static VOID CALLBACK runFunc(
                    PTP_CALLBACK_INSTANCE Instance,
                    PVOID Context,
                    PTP_WORK Work) {
    
       ThreadGroup &thread = *((ThreadGroup*) Context);
       long id = InterlockedIncrement(&(thread.threadCount));
       DWORD tid = (id-1)%thread.size();
       thread[tid]->run();
    }
    
    void run_threads(ThreadGroup* thread_group) {
        SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size());
        SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size());
    
        TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe);
        thread_group->threadCount = 0;
        for (int i=0; i<thread_group->size(); i++) {
            SubmitThreadpoolWork(worker);
         }  
         WaitForThreadpoolWorkCallbacks(worker,FALSE);  
         CloseThreadpoolWork(worker);   
    }       
    
    void main() {
    
       ThreadGroup group;
       std::vector<int> vec(10000, 0);
       for (int i=0; i<10; i++)
          group.push_back(new IncrementVectorThread(i, 10, vec));
    
       run_threads(&group);
       run_threads(&group);
       run_threads(&group);
    
       // now, vec should be == std::vector<int>(10000, 3);       
    }
    

    So, if I understood well :
    - the command CreateThreadpool creates a bunch of Threads (hence, the call to CreateThreadpoolWork is cheap as it doesn't call CreateThread)
    - I can have as many thread pools as I want (if I want to do a thread pool for "IncrementVector" and one for my "DecrementVector" threads, I can).
    - if I need to divide my "increment vector" task into 10 threads, instead of calling 10 times CreateThread, I create a single "worker", and Submit it 10 times to the ThreadPool with the same parameter (hence, I need the thread ID in the callback to know which part of my std::vector to increment). Here I couldn't find the thread ID, since the function GetCurrentThreadId() returns the real ID of the thread (ie., something like 1528, not something between 0..nb_launched_threads).

    Finally, I am not sure I understood the concept well : do I really need a single worker and not 10 if I split my std::vector into 10 threads ?

    Thanks!

  • nbonneel
    nbonneel over 12 years
    thanks - upvoted! I did what you suggested, but with the simple code above, I still don't get the proper values in the vector at the end :s
  • nbonneel
    nbonneel over 12 years
    answer re-validated : the counter was not incremented at the correct place :) Thanks!
  • nbonneel
    nbonneel over 12 years
    sorry, I un-validate so as to re-open the question. The code above fails to give a vector containing only "3" for about 7 out of 10000 executions :s
  • MSalters
    MSalters over 12 years
    That's unlikely to be related to the thread pool but more due to a lack of synchronization between those threads.
  • nbonneel
    nbonneel over 12 years
    It looks like the InterlockedIncrement is not locking the counter properly :s I tried the InterlockedIncrement64 with long long variables, and InterlockedIncrementAcquire/Release, with and without aligned memory allocation with _aligned_malloc, but I still get [probably] a concurrency issue.
  • nbonneel
    nbonneel over 12 years
    (the integers I get are not always strictly consecutive)
  • nbonneel
    nbonneel over 12 years
    whaoo ok, I'm advancing : just by replacing the InterlockedAdd by an EnterCriticalSection/counter++/LeaverCriticalSection, this works fine... any idea why ? (ok, I realize this starts to diverge from the original question : I re-validate and open a new thread about this issue). Thanks!
  • Raymond Chen
    Raymond Chen over 12 years
    You are using InterlockedIncrement incorrectly. See the other question for an explanation.