(C++ Threads): Creating worker threads that will be listening to jobs and executing them concurrently when wanted

17,932

Solution 1

For this purpose you can use boost's threadpool class. It's efficient and well tested. opensource library instead of you writing newly and stabilizing it.

http://threadpool.sourceforge.net/

main()
{
    pool tp(2);   //number of worker threads-currently its 2.

    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}

void first_task()
{
    ...
}

void second_task()
{
    ...
}

Note:

Suggestion for your example: You don't need to have individual mutex object for each thread. Single mutex object lock itself will does the synchronization between all the threads. You are locking mutex of one thread in executejob function and without unlocking another thread is calling lock with different mutex object leading to deadlock or undefined behaviour.

Also since you are calling mutex.lock() inside whileloop without unlocking , same thread is trying to lock itself with same mutex object infinately leading to undefined behaviour.

If you donot need to execute threads parallel you can have one global mutex object can be used inside executejob function to lock and unlock.

mutex m;

void executeJob(int worker)
{
    m.lock();

    //do some job

    m.unlock();
}

If you want to execute job parallel use boost threadpool as I suggested earlier.

Solution 2

In general you can write an algorithm similar to the following. It works with pthreads. I'm sure it would work with c++ threads as well.

  1. create threads and make them wait on a condition variable, e.g. work_exists.
  2. When work arrives you notify all threads that are waiting on that condition variable. Then in the main thread you start waiting on another condition variable work_done
  3. Upon receiving work_exists notification, worker threads wake up, and grab their assigned work from jobs[worker], they execute it, they send a notification on work_done variable, and then go back to waiting on the work_exists condition variable
  4. When main thread receives work_done notification it checks if all threads are done. If not, it keeps waiting till the notification from last-finishing thread arrives.

Solution 3

From cppreference's page on std::mutex::unlock:

The mutex must be unlocked by all threads that have successfully locked it before being destroyed. Otherwise, the behavior is undefined.

Your approach of having one thread unlock a mutex on behalf of another thread is incorrect.

The behavior you're attempting would normally be done using std::condition_variable. There are examples if you look at the links to the member functions.

Share:
17,932
ksm001
Author by

ksm001

Updated on July 30, 2022

Comments

  • ksm001
    ksm001 over 1 year

    Suppose we have two workers. Each worker has an id of 0 and 1. Also suppose that we have jobs arriving all the time, each job has also an identifier 0 or 1 which specifies which worker will have to do this job.

    I would like to create 2 threads that are initially locked, and then when two jobs arrive, unlock them, each of them does their job and then lock them again until other jobs arrive.

    I have the following code:

      #include <iostream>
      #include <thread>
      #include <mutex>
    
      using namespace std;
    
      struct job{
    
          thread jobThread;
          mutex jobMutex;
    
      };
    
      job jobs[2];
    
    
      void executeJob(int worker){
    
          while(true){
    
              jobs[worker].jobMutex.lock();
    
              //do some job
    
          }
    
       }
    
      void initialize(){
    
          int i;
          for(i=0;i<2;i++){
                    jobs[i].jobThread = thread(executeJob, i);
          }
    
       }
    
      int main(void){
    
          //initialization
          initialize();
    
          int buffer[2];
          int bufferSize = 0;
    
          while(true){
              //jobs arrive here constantly, 
                //once the buffer becomes full, 
                //we unlock the threads(workers) and they start working
              bufferSize = 2;
              if(bufferSize == 2){
                  for(int i = 0; i<2; i++){
                      jobs[i].jobMutex.unlock();
                  }
              }
               break;
         }
    
      }
    

    I started using std::thread a few days ago and I'm not sure why but Visual Studio gives me an error saying abort() has been called. I believe there's something missing however due to my ignorance I can't figure out what.

    I would expect this piece of code to actually

    1. Initialize the two threads and then lock them

    2. Inside the main function unlock the two threads, the two threads will do their job(in this case nothing) and then they will become locked again.

    But it gives me an error instead. What am I doing wrong?

    Thank you in advance!

  • ksm001
    ksm001 over 10 years
    could you show me a simple example of this? Unfortunately I've been searching for the past hour and couldn't find a clear reference on how to create a thread pool of my own, most implementations use boost. Other suggestions are good as well, however I haven't used boost library ever before, maybe I should start doing so.