How to multithread queue processing

12,592

Solution 1

I ended up implementing a BlockingQueue, with the suggested fix to pop, here:

Creating a Blocking Queue

Solution 2

As pointed in the comments, STL containers are not thread-safe for read-write operations. Instead, try concurrent_queue class from TBB or PPL, e.g.:

void doWork(concurrent_queue<string>& itemQueue) {
    string result;
    while(itemQueue.try_pop(result)) {
        // you have `result`
    }   
}
Share:
12,592
Chris Redford
Author by

Chris Redford

PhD, Computer Science

Updated on June 05, 2022

Comments

  • Chris Redford
    Chris Redford almost 2 years

    C++ containers are supposed to be thread-safe by default. I must be using queue to multithread incorrectly because for this code:

    #include <thread>
    using std::thread;
    #include <iostream>
    using std::cout;
    using std::endl;
    #include <queue>
    using std::queue;
    #include <string>
    using std::string;
    using std::to_string;
    #include <functional>
    using std::ref;
    
    
    void fillWorkQueue(queue<string>& itemQueue) {
        int size = 40000;
        for(int i = 0; i < size; i++)
            itemQueue.push(to_string(i));
    }
    
    void doWork(queue<string>& itemQueue) {
        while(!itemQueue.empty()) {
            itemQueue.pop();
        }   
    }
    
    void singleThreaded() {
        queue<string> itemQueue;
        fillWorkQueue(itemQueue);
        doWork(itemQueue);
        cout << "done\n";
    }
    
    void multiThreaded() {
        queue<string> itemQueue;
        fillWorkQueue(itemQueue);
        thread t1(doWork, ref(itemQueue));
        thread t2(doWork, ref(itemQueue));
        t1.join();
        t2.join();
        cout << "done\n";
    }
    
    int main() {
        cout << endl;
    
        // Single Threaded
        cout << "singleThreaded\n";
        singleThreaded();
        cout << endl;
    
        // Multi Threaded
        cout << "multiThreaded\n";
        multiThreaded();
        cout << endl;
    }
    

    I'm getting:

    singleThreaded
    done
    
    multiThreaded
    main(32429,0x10e530000) malloc: *** error for object 0x7fe4e3883e00: pointer being freed was not allocated
    *** set a breakpoint in malloc_error_break to debug
    make: *** [run] Abort trap: 6
    

    What am I doing wrong here?

    EDIT

    Apparently I misread the link above. Is there a thread-safe queue implementation available that does what I am trying to do? I know this is a common thread organization strategy.