Event / Task Queue Multithreading C++

25,186

Solution 1

There's Futures library making its way into Boost and the C++ standard library. There's also something of the same sort in ACE, but I would hate to recommend it to anyone (as @lothar already pointed out, it's Active Object.)

Solution 2

Below is an implementation which doesn't require a "functionProxy" method. Even though it is easier to add new methods, it's still messy.

Boost::Bind and "Futures" do seem like they would tidy a lot of this up. I guess I'll have a look at the boost code and see how it works. Thanks for your suggestions everyone.

GThreadObject.h

#include <queue>

using namespace std;

class GThreadObject
{

    template <int size>
    class VariableSizeContainter
    {
        char data[size];
    };

    class event
    {
        public:
        void (GThreadObject::*funcPtr)(void *);
        int dataSize;
        char * data;
    };

public:
    void functionOne(char * argOne, int argTwo);
    void functionTwo(int argTwo, int arg2);


private:
    void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize);
    void workerThread();
    queue<GThreadObject::event*> jobQueue;
    void functionTwoInternal(int argTwo, int arg2);
    void functionOneInternal(char * argOne, int argTwo);

};

GThreadObject.cpp

#include <iostream>
#include "GThreadObject.h"

using namespace std;

/* On a continuous loop, reading tasks from queue
 * When a new event is received it executes the attached function pointer
 * Thread code removed to decrease clutter
 */
void GThreadObject::workerThread()
{
    //New Event added, process it
    GThreadObject::event * receivedEvent = jobQueue.front();

    /* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument.
     * This is the bit i would like to remove
     * Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize
     * Subsequent data sizes would need to be added with an else if
     * */
    if (receivedEvent->dataSize == 8)
    {
        const int size = 8;

        void (GThreadObject::*newFuncPtr)(VariableSizeContainter<size>);
        newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter<size>))receivedEvent->funcPtr;

        //Execute the function
        (*this.*newFuncPtr)(*((VariableSizeContainter<size>*)receivedEvent->data));
    }

    //Clean up
    free(receivedEvent->data);
    delete receivedEvent;

}

void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize)
{

    //Malloc an object the size of the function arguments
    void * myData = malloc(argSize);
    //Copy the data passed to this function into the buffer
    memcpy(myData, (char*)argStart, argSize);

    //Create the event and push it on to the queue
    GThreadObject::event * myEvent = new event;
    myEvent->data = (char*)myData;
    myEvent->dataSize = argSize;
    myEvent->funcPtr = funcPtr;
    jobQueue.push(myEvent);

    //This would be send a thread condition signal, replaced with a simple call here
    this->workerThread();

}

/*
 * This is the public interface, Can be called from child threads
 * Instead of executing the event directly it adds it to a job queue
 * Then the workerThread picks it up and executes all tasks on the same thread
 */
void GThreadObject::functionOne(char * argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
    cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl;

    //Now do the work
}

void GThreadObject::functionTwo(int argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionTwoInternal(int argOne, int argTwo)
{
    cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl;
}

main.cpp

#include <iostream>
#include "GThreadObject.h"

int main()
{

    GThreadObject myObj;

    myObj.functionOne("My Message", 23);
    myObj.functionTwo(456, 23);


    return 0;
}

Edit: Just for completeness I did an implementation with Boost::bind. Key Differences:

queue<boost::function<void ()> > jobQueue;

void GThreadObjectBoost::functionOne(char * argOne, int argTwo)
{
    jobQueue.push(boost::bind(&GThreadObjectBoost::functionOneInternal, this, argOne, argTwo));

    workerThread();
}

void GThreadObjectBoost::workerThread()
{
    boost::function<void ()> func = jobQueue.front();
    func();
}

Using the boost implementation for 10,000,000 Iterations of functionOne() it took ~19sec. However the non boost implementation took only ~6.5 sec. So Approx 3x slower. I'm guessing finding a good non-locking queue will be the biggest performance bottle neck here. But it's still quite a big difference.

Solution 3

The POCO library has something along the same lines called ActiveMethod (along with some related functionality e.g. ActiveResult) in the threading section. The source code is readily available and easily understood.

Solution 4

You might be interested in Active Object one of the ACE Patterns of the ACE framework.

As Nikolai pointed out futures are planned for standard C++ some time in the future (pun intended).

Solution 5

You can solve this by using Boost's Thread -library. Something like this (half-pseudo):


class GThreadObject
{
        ...

        public:
                GThreadObject()
                : _done(false)
                , _newJob(false)
                , _thread(boost::bind(&GThreadObject::workerThread, this))
                {
                }

                ~GThreadObject()
                {
                        _done = true;

                        _thread.join();
                }

                void functionOne(char *argOne, int argTwo)
                {
                        ...

                        _jobQueue.push(myEvent);

                        {
                                boost::lock_guard l(_mutex);

                                _newJob = true;
                        }

                        _cond.notify_one();
                }

        private:
                void workerThread()
                {
                        while (!_done) {
                                boost::unique_lock l(_mutex);

                                while (!_newJob) {
                                        cond.wait(l);
                                }

                                Event *receivedEvent = _jobQueue.front();

                                ...
                        }
                }

        private:
                volatile bool             _done;
                volatile bool             _newJob;
                boost::thread             _thread;
                boost::mutex              _mutex;
                boost::condition_variable _cond;
                std::queue<Event*>        _jobQueue;
};

Also, please note how RAII allow us to get this code smaller and better to manage.

Share:
25,186
Admin
Author by

Admin

Updated on July 09, 2022

Comments

  • Admin
    Admin almost 2 years

    I would like to create a class whose methods can be called from multiple threads. but instead of executing the method in the thread from which it was called, it should perform them all in it's own thread. No result needs to be returned and It shouldn't block the calling thread.

    A first attempt Implementation I have included below. The public methods insert a function pointer and data into a job Queue, which the worker thread then picks up. However it's not particularily nice code and adding new methods is cumbersome.

    Ideally I would like to use this as a base class which I can easy add methods (with a variable number of arguments) with minimum hastle and code duplication.

    What is a better way to do this? Is there any existing code available which does something similar? Thanks

    #include <queue>
    
    using namespace std;
    
    class GThreadObject
    {
        class event
        {
            public:
            void (GThreadObject::*funcPtr)(void *);
            void * data;
        };
    
    public:
        void functionOne(char * argOne, int argTwo);
    
    private:
        void workerThread();
        queue<GThreadObject::event*> jobQueue;
        void functionOneProxy(void * buffer);
        void functionOneInternal(char * argOne, int argTwo);
    
    };
    
    
    
    #include <iostream>
    #include "GThreadObject.h"
    
    using namespace std;
    
    /* On a continuous loop, reading tasks from queue
     * When a new event is received it executes the attached function pointer
     * It should block on a condition, but Thread code removed to decrease clutter
     */
    void GThreadObject::workerThread()
    {
        //New Event added, process it
        GThreadObject::event * receivedEvent = jobQueue.front();
    
        //Execute the function pointer with the attached data
        (*this.*receivedEvent->funcPtr)(receivedEvent->data);
    }
    
    /*
     * This is the public interface, Can be called from child threads
     * Instead of executing the event directly it adds it to a job queue
     * Then the workerThread picks it up and executes all tasks on the same thread
     */
    void GThreadObject::functionOne(char * argOne, int argTwo)
    {
    
        //Malloc an object the size of the function arguments
        int argumentSize = sizeof(char*)+sizeof(int);
        void * myData = malloc(argumentSize);
        //Copy the data passed to this function into the buffer
        memcpy(myData, &argOne, argumentSize);
    
        //Create the event and push it on to the queue
        GThreadObject::event * myEvent = new event;
        myEvent->data = myData;
        myEvent->funcPtr = &GThreadObject::functionOneProxy;
        jobQueue.push(myEvent);
    
        //This would be send a thread condition signal, replaced with a simple call here
        this->workerThread();
    }
    
    /*
     * This handles the actual event
     */
    void GThreadObject::functionOneInternal(char * argOne, int argTwo)
    {
        cout << "We've made it to functionTwo char*:" << argOne << " int:" << argTwo << endl;
    
        //Now do the work
    }
    
    /*
     * This is the function I would like to remove if possible
     * Split the void * buffer into arguments for the internal Function
     */
    void GThreadObject::functionOneProxy(void * buffer)
    {
        char * cBuff = (char*)buffer;
        functionOneInternal((char*)*((unsigned int*)cBuff), (int)*(cBuff+sizeof(char*)));
    };
    
    int main()
    {
        GThreadObject myObj;
    
        myObj.functionOne("My Message", 23);
    
        return 0;
    }
    
  • Nathaniel Sharp
    Nathaniel Sharp almost 15 years
    I was looking for the boost::futures, but as it's not part of a released boost version I had to fall back to my trusted ACE :-)
  • Grant Peters
    Grant Peters almost 15 years
    I highly recommend using mutexes (or any other form of "synchronization primitive") as they do not scale at all with multiple processors (after about 4-8 they actually decrease perfomance). Look into Lock-free coding for truly scalable implemenations. Also, if you need to use a synchronization primitive, use a critical section as they are faster than a mutex (mutex is process safe, critical section is thread safe, i.e. use a mutex when synchronizing between processes, CS when syncing threads in the same process)
  • Grant Peters
    Grant Peters almost 15 years
    This is the exact method we use for developing games on next-gen systems at my company (for an added performance bonus, look into adding items to the work queue via lock-free methods)
  • Mr Fooz
    Mr Fooz almost 15 years
    Is std::queue::push threadsafe? It seems that the functionOne lock_guard should go before the _jobQueue.push call.
  • Ignas Limanauskas
    Ignas Limanauskas almost 15 years
    Any tips on implementing the lock-free queue?
  • Grant Peters
    Grant Peters almost 15 years
    There are quite a few good examples of lock-free queues on the net (it's one of the easiest lock free structures to implement). This is the one that i cut my teeth on when getting onto the lock-free bandwagon: boyet.com/Articles/LockfreeQueue.html
  • Grant Peters
    Grant Peters almost 15 years
    Oh, just beware that the article i linked to sufferes from the ABA problem, there is a link at the top of the page that details this problem and there are better solutions out there on the net for this, but this is one of the easiest tutorials i've seen to learn from.
  • Anthony Williams
    Anthony Williams over 14 years
    The futures library will be part of Boost 1.41. It's also available as part of my C++0x thread library implementation at stdthread.co.uk
  • Nikolai Fetissov
    Nikolai Fetissov over 14 years
    Thanks, Anthony. Nice to hear from you :)