Raising events on separate thread
Solution 1
100 ns is a very tough target to hit. I believe it will take a deep understanding of what you're doing and why to hit that kind of performance.
However, asynchronously invoking event subscribers is pretty easy to solve. It's already answered here by, who else, Jon Skeet.
foreach (MyDelegate action in multicast.GetInvocationList())
{
action.BeginInvoke(...);
}
edit: I should also mention that you need to be running on a real-time operating system to give tight performance guarantees to your users.
Solution 2
You can use these simple extension methods on your event handlers:
public static void Raise<T>(this EventHandler<T> handler, object sender, T e) where T : EventArgs {
if (handler != null) handler(sender, e);
}
public static void Raise(this EventHandler handler, object sender, EventArgs e) {
if (handler != null) handler(sender, e);
}
public static void RaiseOnDifferentThread<T>(this EventHandler<T> handler, object sender, T e) where T : EventArgs {
if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e));
}
public static void RaiseOnDifferentThread(this EventHandler handler, object sender, EventArgs e) {
if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e));
}
public static Task StartNewOnDifferentThread(this TaskFactory taskFactory, Action action) {
return taskFactory.StartNew(action: action, cancellationToken: new CancellationToken());
}
Usage:
public static Test() {
myEventHandler.RaiseOnDifferentThread(null, EventArgs.Empty);
}
The cancellationToken
is necessary to guarantee StartNew()
actually uses a different thread, as explained here.
Solution 3
It seems like you are looking for tasks. The following is an extension method i wrote for my job that can asynchronously invokes an event so that every event handler is on their own thread. I can't comment on its speed since that has never been a requirement for me.
UPDATE
Based on the comments i adjusted it so that only one task is created to call all of the subscribers
/// <summary>
/// Extension method to safely encapsulate asynchronous event calls with checks
/// </summary>
/// <param name="evnt">The event to call</param>
/// <param name="sender">The sender of the event</param>
/// <param name="args">The arguments for the event</param>
/// <param name="object">The state information that is passed to the callback method</param>
/// <remarks>
/// This method safely calls the each event handler attached to the event. This method uses <see cref="System.Threading.Tasks"/> to
/// asynchronously call invoke without any exception handling. As such, if any of the event handlers throw exceptions the application will
/// most likely crash when the task is collected. This is an explicit decision since it is really in the hands of the event handler
/// creators to make sure they handle issues that occur do to their code. There isn't really a way for the event raiser to know
/// what is going on.
/// </remarks>
[System.Diagnostics.DebuggerStepThrough]
public static void AsyncSafeInvoke( this EventHandler evnt, object sender, EventArgs args )
{
// Used to make a temporary copy of the event to avoid possibility of
// a race condition if the last subscriber unsubscribes
// immediately after the null check and before the event is raised.
EventHandler handler = evnt;
if (handler != null)
{
// Manually calling all event handlers so that we could capture and aggregate all the
// exceptions that are thrown by any of the event handlers attached to this event.
var invocationList = handler.GetInvocationList();
Task.Factory.StartNew(() =>
{
foreach (EventHandler h in invocationList)
{
// Explicitly not catching any exceptions. While there are several possibilities for handling these
// exceptions, such as a callback, the correct place to handle the exception is in the event handler.
h.Invoke(sender, args);
}
});
}
}
Solution 4
I can't speak to if this will reliably meet the 100ns requirement but here's an alternative where you'd provide the end user with a way to provide you a ConcurrentQueue that you would fill and they could listen to on a separate thread.
class Program
{
static void Main(string[] args)
{
var multicaster = new QueueMulticaster<int>();
var listener1 = new Listener(); //Make a couple of listening Q objects.
listener1.Listen();
multicaster.Subscribe(listener1);
var listener2 = new Listener();
listener2.Listen();
multicaster.Subscribe(listener2);
multicaster.Broadcast(6); //Send a 6 to both concurrent Queues.
Console.ReadLine();
}
}
//The listeners would run on their own thread and poll the Q like crazy.
class Listener : IListenToStuff<int>
{
public ConcurrentQueue<int> StuffQueue { get; set; }
public void Listen()
{
StuffQueue = new ConcurrentQueue<int>();
var t = new Thread(ListenAggressively);
t.Start();
}
void ListenAggressively()
{
while (true)
{
int val;
if(StuffQueue.TryDequeue(out val))
Console.WriteLine(val);
}
}
}
//Simple class that allows you to subscribe a Queue to a broadcast event.
public class QueueMulticaster<T>
{
readonly List<IListenToStuff<T>> _subscribers = new List<IListenToStuff<T>>();
public void Subscribe(IListenToStuff<T> subscriber)
{
_subscribers.Add(subscriber);
}
public void Broadcast(T value)
{
foreach (var listenToStuff in _subscribers)
{
listenToStuff.StuffQueue.Enqueue(value);
}
}
}
public interface IListenToStuff<T>
{
ConcurrentQueue<T> StuffQueue { get; set; }
}
Since given the fact that you can't hold up processing on other listeners this means multiple threads. Having dedicated listening threads on the listeners seems like a reasonable approach to try, and the concurrent queue seems like a decent delivery mechanism. In this implementation it's just constantly polling, but you could probably use thread signaling to reduce the cpu load using something like AutoResetEvent
.
Related videos on Youtube
Hitesh P
Updated on July 09, 2022Comments
-
Hitesh P almost 2 years
I am developing a component which needs to process the live feed and broadcast the data to the listeners in pretty fast manner ( with about 100 nano second level accuracy, even less than that if I can do that) Currently I am raising an event from my code which subscriber can subscribe to. However because in C# event handlers run on the same thread which raises the event, my thread which raises the event will be blocked until all subscribers finish processing the event. I do not have control on subscribers' code, so they can possibly do any time consuming operations in event handler, which may block the thread which is broadcasting.
What can I do so that I can broadcast the data to other subscribers but can still broadcast the stuff quite fast??
-
Dave over 10 years
-
gunr2171 over 10 yearsSounds like you need to fire off the event in its own thread.
-
usr over 10 years100ns are about 300 instructions. Good luck getting cross-thread synchronization at these rates.
-
paparazzo over 10 yearsI don't get how you don't have any control on subscriber's code. Can another programs subscribe to an event?
-
Servy over 10 years@Blam If you're writing library code that is going to be imported by all sorts of different types of consumers, each adding event handlers with arbitrary code...
-
user1703401 over 10 yearsVery unrealistic goals. The number 100 only means something to humans, a machine doesn't care and counts with two fingers. Pick 256.
-
Enigmativity over 10 yearsTake a look at Microsoft's Reactive Framework. It'll take a lot of pain out of this for you.
-
-
Servy over 10 yearsOdds are he doesn't need to invoke each handler in their own thread, but rather invoke all of the handlers in a single new thread. This would significant cut down on the overhead.
-
Servy over 10 yearsIf you find another duplicate question you should flag/vote to close, rather than posting an answer with a link.
-
dss539 over 10 years+1 for extensive comments, however, this doesn't address his 100 ns constraint
-
dss539 over 10 years@Servy The other question does not address the time constraint
-
Servy over 10 yearsThere's no point in explicitly copying the delegate here; it was copied when it was passed into the method.
-
Servy over 10 yearsAnother drawback of this particular method is that you need to write a new one for each type of delegate you have an event for, but there isn't really any good way around that problem, beyond just not extracting it out into a method in the first place.
-
Iddillian over 10 years@Servy Probably. It wouldn't be hard to change it to act that way. I actually wrote another extension method SafeInvoke, that actually aggregates all the exceptions, that i usually wrap into a task explicitly.
-
Servy over 10 yearsNeither does your answer.
-
dss539 over 10 yearsMy instinct would be that he does indeed want to invoke each handler async so that 1 bad consumer doesn't delay messages to the other consumers.
-
Servy over 10 years@dss539 You only really have one consuming application at a time. If there are multiple subscribers in a given application it's not really reasonable to expect a single poor one from harming the rest. Beyond that, the performance constraints would prohibit such a change. There just isn't enough time to do that.
-
dss539 over 10 yearsRegarding null checking events - I find it's best to just subscribe an empty handler to every delegate to avoid all the extremely subtle race conditions.
public event EventHandler MyEvent += ()=>{};
protect from so much potential for danger. -
dss539 over 10 years@Servy Who says you only have one consuming application at a time? And the OP's whole goal was to prevent one poor performer from harming the rest...
-
Servy over 10 years@dss539 Yet it comes at a performance cost. Relevant in an application that's considering such tiny runtime speeds.
-
Iddillian over 10 years@dss539 I seem to remember that when i originally came up with my SafeInvoke family of methods that i found a comment from JonSkeet about why that wasn't a good idea
-
Servy over 10 years@dss539 To quote the question, "which may block the thread which is broadcasting". His goal here is to ensure that the thread firing the event isn't blocked. Not that one handler doesn't block another. I looked at it closely, precicely because it wasn't clear which he wanted.
-
user1703401 over 10 years+1, it does. 100 ns is indeed a very tough target to hit with BeginInvoke(). Just the context switch is already several thousand cpu cycles :)
-
dss539 over 10 years@Servy I can see how you would interpret his words in that way. My understanding is that, overall, his goal is to ensure timely delivery of events as they happen to all consumers. This requires, as you mention, that the event dispatcher not block, but it additionally requires that the event handlers not block each other. Allowing the listeners to run in parallel doesn't necessarily reduce overall performance as long as the CPU has enough cores to truly run the listeners in parallel.
-
Servy over 10 years@dss539 But it does, as it means that there is more time spent scheduling the various actions, and more time spent context switching. That can have a dramatic performance impact. While being able to run each individually may be nice, it's unlikely to be possible given the constraints.
-
dss539 over 10 years@Servy but it doesn't - if you have 5 consumers and 8 CPU cores, where is the context switch? If you have listener threads/processes already running, then I don't see why there would be any context switching overhead.
-
Servy over 10 years@dss539 The current thread still needs to spend the time scheduling the new operations to be run, at the very least, and you need to consider the case where there aren't more cores than operations being performed, as you have no assertion that that's the case.
-
dss539 over 10 years@Servy A worker thread in a spinlock waiting for work requires no scheduling. I agree that if it were simply waiting on a WaitHandle that it would probably need the OS scheduler to get involved. And yes, this approach requires as many cores as you have listeners, so it may not work in the OP's situation.
-
Servy over 10 years@dss539 But in this case you don't have a worker thread sitting in a spinlock waiting for work. To be in that situation you'd need to create your own special thread pool that had that mechanism so that it could be used here, and that work hasn't been done.
-
dss539 over 10 yearsInstead of using an AutoResetEvent, you could just use
BlockingCollection
msdn.microsoft.com/en-us/library/dd267312.aspx to simplify it a little.