Multithreaded Observer Pattern

10,959

Solution 1

Consider the use of producer-consumer queues or message queues. For your example, you can use a queue in two ways:

  1. Changes to the Subject are queued. When something updates the subject, it puts the new state in the queue and returns immediately. This way, the updater does not block while the observers are notified. You will need a thread that continuously dequeues state changes and updates observers.

  2. Notifications to Observers are queued. Each observer has a queue where subject state-change notifications are posted.

If you are using the Qt library, you can use the signals & slots mechanism with the Qt::QueuedConnection connection type. The slot goes through the receiver's event queue and is executed in the receiver's thread. This way, the sender does not block while the receivers execute their respective slots.

Your program might be a good candidate for the Actor model (paradigm). Here are some C++ libraries that implement the actor model:

Your program might also be a good candidate for the Dataflow paradigm. Check out the proposed Boost Dataflow library, which supports threading.


I don't have a book to recommend, but check out Herb Sutter's series of Dr Dobbs articles on C++ concurrency.

Solution 2

I wrote a Multithreaded observer pattern in Java

import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
 * An observer pattern that allows listeners to register(), unregister() in
 * multiple threads and also notify listeners in another thread.
 * 
 * A HashMap keeps track of the listeners and their status (active, obsolete).
 * When a listener unregister, its entry is marked as obsolete in this map.
 * 
 * During firing of an event, the observer notifies all the listeners that are
 * active, the active status will be stored in a Boolean that's synchronized so
 * rare race conditions like calling notify on an active listener that has just
 * turned obsolete will not happen.
 * 
 * 
 */
public class MultithreadedObserverPattern <T extends AbstractListener> {

    interface Handler<T> {
        void handleEvent(T listener);
    }

    class BooleanHolder {
        boolean val;

        BooleanHolder(boolean v) {
            val = v;
        }

        void set(boolean v) {
            val = v;
        }

        boolean get() {
            return val;
        }
    }

    Map<AbstractListener, BooleanHolder> listeners = new HashMap<AbstractListener, BooleanHolder>();

    public void register(AbstractListener l) {
        synchronized (listeners) {
            listeners.put(l, new BooleanHolder(true));
        }
    }

    public void unregister(AbstractListener l) {
        synchronized (listeners) {
            BooleanHolder status = listeners.get(l);
            if (status != null) {
                // notify call also syncing on status
                synchronized (status) {
                    status.set(false);
                }
            }
            // set to false
        }
    }

    public void notifyAll(Handler handler) {
        // here we do not synchroznie on listeners to avoid tricky lock situations
        // make a copy of the map
        List<Entry<AbstractListener, BooleanHolder>> activeListeners = new ArrayList<Entry<AbstractListener, BooleanHolder>>();
        List<AbstractListener> inactiveListeners = new ArrayList<AbstractListener>(); 

        synchronized (listeners) {
            for (Entry<AbstractListener, BooleanHolder> entry : listeners.entrySet()) {
                if (entry.getValue().get()) {
                    activeListeners.add(entry);
                } else {
                    inactiveListeners.add(entry.getKey());
                }
            }
        }
         // call the method on active listener
        // 
        for (Entry<AbstractListener, BooleanHolder> e : activeListeners) {
            BooleanHolder status = e.getValue();
            // remove those listeners that are no longer active
            synchronized (status) {
                if (status.get()) {
                      handler.handleEvent(e.getKey());
                }
            }
        }

        synchronized (listeners) {
            // remove inactive listeners
            for (AbstractListener l : inactiveListeners) {
                listeners.remove(l);
            }
        }
    }
}
Share:
10,959

Related videos on Youtube

shiv chawla
Author by

shiv chawla

I am a finance professional and work with a money manager in Boston. I have an interest in trading and solving programming problems.

Updated on June 04, 2022

Comments

  • shiv chawla
    shiv chawla almost 2 years

    I have a question where a subject is updated in a different thread every time. So whenever the subject is updated it correspondingly updates the observer with the new information. However, if the list of observers is long, it will require some time to update all the observers. Think of a subject that gets updated very frequently. While the subject is updating the observers, "subject" object is locked and hence cannot be updated by a different thread. This will either create information traffic for subject or cause loss of information.

    Do you have any idea how these issues are handled in a multi-threaded environment? Also, Can anyone recommend some books on parallel programming with C++?

    • INS
      INS over 12 years
      it this a theoretical question? because there may be other solutions for your problem that don't involve the use of so called 'patterns'.
    • moki
      moki about 9 years
      subject does not need to be "locked" if the information it holds does not change during the update of the observers or the change does not have any consequences. Think of a read-only file as an example. As far as i can think, Multi-threading will not solve the issue if I have understood your point properly. Also other threads could wait until the subject becomes available again and not return after a timeout and there would be no "loss of information" but that depends on the context that is difficult to imagine by the amount of information you have provided in the description of your problem.
  • neverlord
    neverlord over 12 years
    Btw: libcppa supports publish/subscribe based group communication for actors. The Subject can be easily modeled as a local group. Any number of threads/actors (publishers) are sending to the group, any number of threads/actors (receivers) can receive by just joining the group.