Raising events on separate thread

25,951

Solution 1

100 ns is a very tough target to hit. I believe it will take a deep understanding of what you're doing and why to hit that kind of performance.

However, asynchronously invoking event subscribers is pretty easy to solve. It's already answered here by, who else, Jon Skeet.

foreach (MyDelegate action in multicast.GetInvocationList())
{
    action.BeginInvoke(...);
}

edit: I should also mention that you need to be running on a real-time operating system to give tight performance guarantees to your users.

Solution 2

You can use these simple extension methods on your event handlers:

public static void Raise<T>(this EventHandler<T> handler, object sender, T e) where T : EventArgs {
    if (handler != null) handler(sender, e);
}

public static void Raise(this EventHandler handler, object sender, EventArgs e) {
    if (handler != null) handler(sender, e);
}

public static void RaiseOnDifferentThread<T>(this EventHandler<T> handler, object sender, T e) where T : EventArgs {
    if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e));
}

public static void RaiseOnDifferentThread(this EventHandler handler, object sender, EventArgs e) {
    if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e));
}

public static Task StartNewOnDifferentThread(this TaskFactory taskFactory, Action action) {
    return taskFactory.StartNew(action: action, cancellationToken: new CancellationToken());
}

Usage:

public static Test() {
     myEventHandler.RaiseOnDifferentThread(null, EventArgs.Empty);
}

The cancellationToken is necessary to guarantee StartNew() actually uses a different thread, as explained here.

Solution 3

It seems like you are looking for tasks. The following is an extension method i wrote for my job that can asynchronously invokes an event so that every event handler is on their own thread. I can't comment on its speed since that has never been a requirement for me.


UPDATE

Based on the comments i adjusted it so that only one task is created to call all of the subscribers

/// <summary>
/// Extension method to safely encapsulate asynchronous event calls with checks
/// </summary>
/// <param name="evnt">The event to call</param>
/// <param name="sender">The sender of the event</param>
/// <param name="args">The arguments for the event</param>
/// <param name="object">The state information that is passed to the callback method</param>
/// <remarks>
/// This method safely calls the each event handler attached to the event. This method uses <see cref="System.Threading.Tasks"/> to
/// asynchronously call invoke without any exception handling. As such, if any of the event handlers throw exceptions the application will
/// most likely crash when the task is collected. This is an explicit decision since it is really in the hands of the event handler
/// creators to make sure they handle issues that occur do to their code. There isn't really a way for the event raiser to know
/// what is going on.
/// </remarks>
[System.Diagnostics.DebuggerStepThrough]
public static void AsyncSafeInvoke( this EventHandler evnt, object sender, EventArgs args )
{
    // Used to make a temporary copy of the event to avoid possibility of
    // a race condition if the last subscriber unsubscribes
    // immediately after the null check and before the event is raised.
    EventHandler handler = evnt;
    if (handler != null)
    {
        // Manually calling all event handlers so that we could capture and aggregate all the
        // exceptions that are thrown by any of the event handlers attached to this event.  
        var invocationList = handler.GetInvocationList();

        Task.Factory.StartNew(() =>
        {
            foreach (EventHandler h in invocationList)
            {
                // Explicitly not catching any exceptions. While there are several possibilities for handling these 
                // exceptions, such as a callback, the correct place to handle the exception is in the event handler.
                h.Invoke(sender, args);
            }
        });
    }
}

Solution 4

I can't speak to if this will reliably meet the 100ns requirement but here's an alternative where you'd provide the end user with a way to provide you a ConcurrentQueue that you would fill and they could listen to on a separate thread.

class Program
{
    static void Main(string[] args)
    {
        var multicaster = new QueueMulticaster<int>();

        var listener1 = new Listener(); //Make a couple of listening Q objects. 
        listener1.Listen();
        multicaster.Subscribe(listener1);

        var listener2 = new Listener();
        listener2.Listen();
        multicaster.Subscribe(listener2);

        multicaster.Broadcast(6); //Send a 6 to both concurrent Queues. 
        Console.ReadLine();
    }
}

//The listeners would run on their own thread and poll the Q like crazy. 
class Listener : IListenToStuff<int>
{
    public ConcurrentQueue<int> StuffQueue { get; set; }

    public void Listen()
    {
        StuffQueue = new ConcurrentQueue<int>();
        var t = new Thread(ListenAggressively);
        t.Start();

    }

    void ListenAggressively()
    {
        while (true)
        {
            int val;
            if(StuffQueue.TryDequeue(out val))
                Console.WriteLine(val);
        }
    }
}

//Simple class that allows you to subscribe a Queue to a broadcast event. 
public class QueueMulticaster<T>
{
    readonly List<IListenToStuff<T>> _subscribers = new List<IListenToStuff<T>>();
    public void Subscribe(IListenToStuff<T> subscriber)
    {
        _subscribers.Add(subscriber);
    }
    public void Broadcast(T value)
    {
        foreach (var listenToStuff in _subscribers)
        {
            listenToStuff.StuffQueue.Enqueue(value);
        }
    }
}

public interface IListenToStuff<T>
{
    ConcurrentQueue<T> StuffQueue { get; set; }
}

Since given the fact that you can't hold up processing on other listeners this means multiple threads. Having dedicated listening threads on the listeners seems like a reasonable approach to try, and the concurrent queue seems like a decent delivery mechanism. In this implementation it's just constantly polling, but you could probably use thread signaling to reduce the cpu load using something like AutoResetEvent.

Share:
25,951

Related videos on Youtube

Hitesh P
Author by

Hitesh P

Updated on July 09, 2022

Comments

  • Hitesh P
    Hitesh P almost 2 years

    I am developing a component which needs to process the live feed and broadcast the data to the listeners in pretty fast manner ( with about 100 nano second level accuracy, even less than that if I can do that) Currently I am raising an event from my code which subscriber can subscribe to. However because in C# event handlers run on the same thread which raises the event, my thread which raises the event will be blocked until all subscribers finish processing the event. I do not have control on subscribers' code, so they can possibly do any time consuming operations in event handler, which may block the thread which is broadcasting.

    What can I do so that I can broadcast the data to other subscribers but can still broadcast the stuff quite fast??

    • Dave
      Dave over 10 years
    • gunr2171
      gunr2171 over 10 years
      Sounds like you need to fire off the event in its own thread.
    • usr
      usr over 10 years
      100ns are about 300 instructions. Good luck getting cross-thread synchronization at these rates.
    • paparazzo
      paparazzo over 10 years
      I don't get how you don't have any control on subscriber's code. Can another programs subscribe to an event?
    • Servy
      Servy over 10 years
      @Blam If you're writing library code that is going to be imported by all sorts of different types of consumers, each adding event handlers with arbitrary code...
    • user1703401
      user1703401 over 10 years
      Very unrealistic goals. The number 100 only means something to humans, a machine doesn't care and counts with two fingers. Pick 256.
    • Enigmativity
      Enigmativity over 10 years
      Take a look at Microsoft's Reactive Framework. It'll take a lot of pain out of this for you.
  • Servy
    Servy over 10 years
    Odds are he doesn't need to invoke each handler in their own thread, but rather invoke all of the handlers in a single new thread. This would significant cut down on the overhead.
  • Servy
    Servy over 10 years
    If you find another duplicate question you should flag/vote to close, rather than posting an answer with a link.
  • dss539
    dss539 over 10 years
    +1 for extensive comments, however, this doesn't address his 100 ns constraint
  • dss539
    dss539 over 10 years
    @Servy The other question does not address the time constraint
  • Servy
    Servy over 10 years
    There's no point in explicitly copying the delegate here; it was copied when it was passed into the method.
  • Servy
    Servy over 10 years
    Another drawback of this particular method is that you need to write a new one for each type of delegate you have an event for, but there isn't really any good way around that problem, beyond just not extracting it out into a method in the first place.
  • Iddillian
    Iddillian over 10 years
    @Servy Probably. It wouldn't be hard to change it to act that way. I actually wrote another extension method SafeInvoke, that actually aggregates all the exceptions, that i usually wrap into a task explicitly.
  • Servy
    Servy over 10 years
    Neither does your answer.
  • dss539
    dss539 over 10 years
    My instinct would be that he does indeed want to invoke each handler async so that 1 bad consumer doesn't delay messages to the other consumers.
  • Servy
    Servy over 10 years
    @dss539 You only really have one consuming application at a time. If there are multiple subscribers in a given application it's not really reasonable to expect a single poor one from harming the rest. Beyond that, the performance constraints would prohibit such a change. There just isn't enough time to do that.
  • dss539
    dss539 over 10 years
    Regarding null checking events - I find it's best to just subscribe an empty handler to every delegate to avoid all the extremely subtle race conditions. public event EventHandler MyEvent += ()=>{}; protect from so much potential for danger.
  • dss539
    dss539 over 10 years
    @Servy Who says you only have one consuming application at a time? And the OP's whole goal was to prevent one poor performer from harming the rest...
  • Servy
    Servy over 10 years
    @dss539 Yet it comes at a performance cost. Relevant in an application that's considering such tiny runtime speeds.
  • Iddillian
    Iddillian over 10 years
    @dss539 I seem to remember that when i originally came up with my SafeInvoke family of methods that i found a comment from JonSkeet about why that wasn't a good idea
  • Servy
    Servy over 10 years
    @dss539 To quote the question, "which may block the thread which is broadcasting". His goal here is to ensure that the thread firing the event isn't blocked. Not that one handler doesn't block another. I looked at it closely, precicely because it wasn't clear which he wanted.
  • user1703401
    user1703401 over 10 years
    +1, it does. 100 ns is indeed a very tough target to hit with BeginInvoke(). Just the context switch is already several thousand cpu cycles :)
  • dss539
    dss539 over 10 years
    @Servy I can see how you would interpret his words in that way. My understanding is that, overall, his goal is to ensure timely delivery of events as they happen to all consumers. This requires, as you mention, that the event dispatcher not block, but it additionally requires that the event handlers not block each other. Allowing the listeners to run in parallel doesn't necessarily reduce overall performance as long as the CPU has enough cores to truly run the listeners in parallel.
  • Servy
    Servy over 10 years
    @dss539 But it does, as it means that there is more time spent scheduling the various actions, and more time spent context switching. That can have a dramatic performance impact. While being able to run each individually may be nice, it's unlikely to be possible given the constraints.
  • dss539
    dss539 over 10 years
    @Servy but it doesn't - if you have 5 consumers and 8 CPU cores, where is the context switch? If you have listener threads/processes already running, then I don't see why there would be any context switching overhead.
  • Servy
    Servy over 10 years
    @dss539 The current thread still needs to spend the time scheduling the new operations to be run, at the very least, and you need to consider the case where there aren't more cores than operations being performed, as you have no assertion that that's the case.
  • dss539
    dss539 over 10 years
    @Servy A worker thread in a spinlock waiting for work requires no scheduling. I agree that if it were simply waiting on a WaitHandle that it would probably need the OS scheduler to get involved. And yes, this approach requires as many cores as you have listeners, so it may not work in the OP's situation.
  • Servy
    Servy over 10 years
    @dss539 But in this case you don't have a worker thread sitting in a spinlock waiting for work. To be in that situation you'd need to create your own special thread pool that had that mechanism so that it could be used here, and that work hasn't been done.
  • dss539
    dss539 over 10 years
    Instead of using an AutoResetEvent, you could just use BlockingCollection msdn.microsoft.com/en-us/library/dd267312.aspx to simplify it a little.