Wrapping ManualResetEvent as awaitable task

25,496

Solution 1

RegisterWaitForSingleObject will combine waits onto dedicated waiter threads, each of which can wait on multiple handles (specifically, 63 of them, which is MAXIMUM_WAIT_OBJECTS minus one for a "control" handle).

So you should be able to use something like this (warning: untested):

public static class WaitHandleExtensions
{
    public static Task AsTask(this WaitHandle handle)
    {
        return AsTask(handle, Timeout.InfiniteTimeSpan);
    }

    public static Task AsTask(this WaitHandle handle, TimeSpan timeout)
    {
        var tcs = new TaskCompletionSource<object>();
        var registration = ThreadPool.RegisterWaitForSingleObject(handle, (state, timedOut) =>
        {
            var localTcs = (TaskCompletionSource<object>)state;
            if (timedOut)
                localTcs.TrySetCanceled();
            else
                localTcs.TrySetResult(null);
        }, tcs, timeout, executeOnlyOnce: true);
        tcs.Task.ContinueWith((_, state) => ((RegisteredWaitHandle)state).Unregister(null), registration, TaskScheduler.Default);
        return tcs.Task;
    }
}

Solution 2

You also can use SemaphoreSlim.WaitAsync() which is similar to ManualResetEvent

Solution 3

You can give this one a shot, https://www.badflyer.com/asyncmanualresetevent , tried to build upon the example on https://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx to support timeouts and cancellation.

using System;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// An async manual reset event.
/// </summary>
public sealed class ManualResetEventAsync
{
    // Inspiration from https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/
    // and the .net implementation of SemaphoreSlim

    /// <summary>
    ///  The timeout in milliseconds to wait indefinitly.
    /// </summary>
    private const int WaitIndefinitly = -1;

    /// <summary>
    /// True to run synchronous continuations on the thread which invoked Set. False to run them in the threadpool.
    /// </summary>
    private readonly bool runSynchronousContinuationsOnSetThread = true;

    /// <summary>
    /// The current task completion source.
    /// </summary>
    private volatile TaskCompletionSource<bool> completionSource = new TaskCompletionSource<bool>();

    /// <summary>
    /// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
    /// </summary>
    /// <param name="isSet">True to set the task completion source on creation.</param>
    public ManualResetEventAsync(bool isSet)
        : this(isSet: isSet, runSynchronousContinuationsOnSetThread: true)
    {
    }

    /// <summary>
    /// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
    /// </summary>
    /// <param name="isSet">True to set the task completion source on creation.</param>
    /// <param name="runSynchronousContinuationsOnSetThread">If you have synchronous continuations, they will run on the thread which invokes Set, unless you set this to false.</param>
    public ManualResetEventAsync(bool isSet, bool runSynchronousContinuationsOnSetThread)
    {
        this.runSynchronousContinuationsOnSetThread = runSynchronousContinuationsOnSetThread;

        if (isSet)
        {
            this.completionSource.TrySetResult(true);
        }
    }

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <returns>A task which completes when the event is set.</returns>
    public Task WaitAsync()
    {
        return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, default(CancellationToken));
    }

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="token">A cancellation token.</param>
    /// <returns>A task which waits for the manual reset event.</returns>
    public Task WaitAsync(CancellationToken token)
    {
        return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, token);
    }

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="timeout">A timeout.</param>
    /// <param name="token">A cancellation token.</param>
    /// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
    public Task<bool> WaitAsync(TimeSpan timeout, CancellationToken token)
    {
        return this.AwaitCompletion((int)timeout.TotalMilliseconds, token);
    }

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="timeout">A timeout.</param>
    /// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
    public Task<bool> WaitAsync(TimeSpan timeout)
    {
        return this.AwaitCompletion((int)timeout.TotalMilliseconds, default(CancellationToken));
    }

    /// <summary>
    /// Set the completion source.
    /// </summary>
    public void Set()
    {
        if (this.runSynchronousContinuationsOnSetThread)
        {
            this.completionSource.TrySetResult(true);
        }
        else
        {
            // Run synchronous completions in the thread pool.
            Task.Run(() => this.completionSource.TrySetResult(true));
        }
    }

    /// <summary>
    /// Reset the manual reset event.
    /// </summary>
    public void Reset()
    {
        // Grab a reference to the current completion source.
        var currentCompletionSource = this.completionSource;

        // Check if there is nothing to be done, return.
        if (!currentCompletionSource.Task.IsCompleted)
        {
            return;
        }

        // Otherwise, try to replace it with a new completion source (if it is the same as the reference we took before).
        Interlocked.CompareExchange(ref this.completionSource, new TaskCompletionSource<bool>(), currentCompletionSource);
    }

    /// <summary>
    /// Await completion based on a timeout and a cancellation token.
    /// </summary>
    /// <param name="timeoutMS">The timeout in milliseconds.</param>
    /// <param name="token">The cancellation token.</param>
    /// <returns>A task (true if wait succeeded). (False on timeout).</returns>
    private async Task<bool> AwaitCompletion(int timeoutMS, CancellationToken token)
    {
        // Validate arguments.
        if (timeoutMS < -1 || timeoutMS > int.MaxValue)
        {
            throw new ArgumentException("The timeout must be either -1ms (indefinitely) or a positive ms value <= int.MaxValue");
        }

        CancellationTokenSource timeoutToken = null;

        // If the token cannot be cancelled, then we dont need to create any sort of linked token source.
        if (false == token.CanBeCanceled)
        {
            // If the wait is indefinite, then we don't need to create a second task at all to wait on, just wait for set. 
            if (timeoutMS == -1)
            {
                return await this.completionSource.Task;
            }

            timeoutToken = new CancellationTokenSource();
        }
        else
        {
            // A token source which will get canceled either when we cancel it, or when the linked token source is canceled.
            timeoutToken = CancellationTokenSource.CreateLinkedTokenSource(token);
        }

        using (timeoutToken)
        {
            // Create a task to account for our timeout. The continuation just eats the task cancelled exception, but makes sure to observe it.
            Task delayTask = Task.Delay(timeoutMS, timeoutToken.Token).ContinueWith((result) => { var e = result.Exception; }, TaskContinuationOptions.ExecuteSynchronously);

            var resultingTask = await Task.WhenAny(this.completionSource.Task, delayTask).ConfigureAwait(false);

            // The actual task finished, not the timeout, so we can cancel our cancellation token and return true.
            if (resultingTask != delayTask)
            {
                // Cancel the timeout token to cancel the delay if it is still going.
                timeoutToken.Cancel();
                return true;
            }

            // Otherwise, the delay task finished. So throw if it finished because it was canceled.
            token.ThrowIfCancellationRequested();
            return false;
        }
    }
}

Solution 4

Stephen's Cleary solution looks perfect. Microsoft provides the similar one.

As I haven't seen an example with cancellation logic.

Here it is:

public static class WaitHandleExtensions
{
    public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
    {
        if (waitHandle == null)
            throw new ArgumentNullException(nameof(waitHandle));

        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
        CancellationTokenRegistration ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
        TimeSpan timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;

        RegisteredWaitHandle rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
            (_, timedOut) =>
            {
                if (timedOut)
                {
                    tcs.TrySetCanceled();
                }
                else
                {
                    tcs.TrySetResult(true);
                }
            }, 
            null, timeout, true);

        Task<bool> task = tcs.Task;

        _ = task.ContinueWith(_ =>
        {
            rwh.Unregister(null);
            return ctr.Unregister();
        }, CancellationToken.None);

        return task;
    }
}

Solution 5

Alternative solution: wait for the handles of the task and the manual reset event

I was having memory leaks when using Task.WaitAny() with a Task (returned by SqlConnection.OpenAsync()') and a Manual Reset Event received as parameter and wrapped in a Task with AsTask(). These object were not being disposed: TaskCompletionSource<Object>, Task<Object>, StandardTaskContinuation, RegisteredWaitHandle, RegisteredWaithandleSafe, ContinuationResultTaskFromresultTask<Object,bool>, _ThreadPoolWaitOrTimerCallback).

This is real production code, used in a Windows service, of a function that tries to open a connection to a db in a loop until the connection is opened, or the operation fails, or the ManualResetEvent _finishRequest, received as parameter in the function containing this code, is signaled by code in any other thread.

To avoid the leak, I decided to do it the other way round: wait for the handles of the _finishRequest and the Task returned by OpenAsync():

Task asyncOpening = sqlConnection.OpenAsync();

// Wait for the async open to finish, or until _finishRequest is signaled
var waitHandles = new WaitHandle[]
{
  // index 0 in array: extract the AsyncWaitHandle from the Task
  ((IAsyncResult)asyncOpening).AsyncWaitHandle,
  // index 1:
  _finishRequest
};

// Check if finish was requested (index of signaled handle in the array = 1)
int whichFinished = WaitHandle.WaitAny(waitHandles);
finishRequested = whichFinished == 1;
// If so, break the loop to exit immediately
if (finishRequested)
  break;
                    
// If not, check if OpenAsync finished with error (it's a Task)
if (asyncOpening.IsFaulted)
{
  // Extract the exception from the task, and throw it
  // NOTE: adapt it to your case. In mine, I'm interested in the inner exception,
  // but you can check the exception itself, for example to see if it was a timeout,
  // if you specified it in the call to the async function that returns the Task
  var ex = asyncOpening?.Exception?.InnerExceptions?[0];
  if (ex != null) throw ex; 
}
else
{
  Log.Verbose("Connection to database {Database} on server {Server}", database, server);
  break;
}

If you also need the timeout, you can include it in the call to OpenAsync, or you asyn function, and then check if the result of the async operation was cancelled because of the timeout: check the status of the Task when finished, as you can see in the NOTE in the code comment.

Share:
25,496
noseratio
Author by

noseratio

Dad, self-employed, problem solver at heart. Formerly a principal software engineer at Nuance Communications. Async all the way down with .NET, Node.js, Electron.js, WebView2, WebRTC, PDFium, Google Speech API and more. Nozillium.com, Twitter, LinkedIn, GitHub, Dev.to Video: My .NET Conf 2020 talk on Asynchronous coroutines with C# Tool: #DevComrade, for pasting unformatted text in Windows by default, systemwide Blog: A few handy JavaScript tricks Tool: wsudo, a unix-like sudo CLI utility for Windows, Powershell-based Blog: Why I no longer use ConfigureAwait(false) Blog: C# events as asynchronous streams with ReactiveX or Channels Howto: OpenSSH with MFA on OpenWrt 19.07.x using Google Authenticator Why doesn't await on Task.WhenAll throw an AggregateException? Async/await, custom awaiter and garbage collector StaTaskScheduler and STA thread message pumping How to Unit test ViewModel with async initialization in WPF Keep UI thread responsive when running long task in windows forms Converting between 2 different libraries using the same COM interface Asynchronous WebBrowser-based console web scrapper Thread affinity for async/await in ASP.NET Throttling asynchronous tasks Task sequencing and re-entracy A reusable pattern to convert event into task Task.Yield - real usages? Call async method on UI thread How to make make a .NET COM object apartment-threaded? ... and more!

Updated on November 02, 2021

Comments

  • noseratio
    noseratio over 2 years

    I'd like to await on a manual reset event with time-out and observing cancellation. I've come up with something like below. The manual reset event object is provided by an API beyond my control. Is there a way to make this happen without taking on and blocking a thread from ThreadPool?

    static Task<bool> TaskFromWaitHandle(WaitHandle mre, int timeout, CancellationToken ct)
    {
        return Task.Run(() =>
        {
            bool s = WaitHandle.WaitAny(new WaitHandle[] { mre, ct.WaitHandle }, timeout) == 0;
            ct.ThrowIfCancellationRequested();
            return s;
        }, ct);
    }
    
    // ...
    
    if (await TaskFromWaitHandle(manualResetEvent, 1000, cts.Token))
    {
        // true if event was set
    }
    else 
    {
        // false if timed out, exception if cancelled 
    }
    

    [EDITED] Apparently, it makes sense to use RegisterWaitForSingleObject. I'll give it a try.

  • noseratio
    noseratio over 10 years
    Thanks @StephenCleary. Is there a good reason to pass tcs as state here? Overall, would you consider this a preferred solution over using WaitHandle.WaitAny?
  • Stephen Cleary
    Stephen Cleary over 10 years
    I'm using the state parameters on RegisterWaitForSingleObject and ContinueWith as an optimization. This is preferred over WaitAny because this wait can be combined with other waits.
  • binki
    binki about 8 years
    @StephenCleary but why the type unsafety? TaskCompletionSource<RegisteredWaitHandle>.Task will still implicitly cast to Task just fine. Or if that wasn’t an option, you could get type safety by using automatic closure ;-).
  • Stephen Cleary
    Stephen Cleary about 8 years
    @binki: Yes, you could use TCS<RegisteredWaitHandle> for a typesafe ContinueWith call. The RegisterWaitForSingleObject still wouldn't be typesafe, though. And yes, you can use closures but that prevents the allocation optimizations.
  • Peter McEvoy
    Peter McEvoy over 6 years
    Hi, we've been using this technique as documented, but have discovered that Thread.CurrentPrincipal has been changed to a WindowsPrincipal after the await has finished.. I think this means are synchronization/security context has been lost? Any idea how we can preserve it?
  • Stephen Cleary
    Stephen Cleary over 6 years
    @PeterMcEvoy: That depends on your hosting environment. If you're on pre-Core ASP.NET, then double-check all your settings and targets.
  • Peter McEvoy
    Peter McEvoy over 6 years
    Unfortunately we already have targetFramework="4.6.1" for httpRuntime and compilation.
  • Peter McEvoy
    Peter McEvoy over 6 years
    Ah - discovered the issue, we were setting/getting identity from Thread.CurrentPrincipal, but in a pure async controller (with no ConfigureAwait(false)) we really should be using HttpContext.Current.User as that is restored after awaiting
  • rollsch
    rollsch about 6 years
    The problem is there is no "Wait without decrement". Sometimes you just want to wait and not decrement the counter.
  • krimog
    krimog over 4 years
    The SemaphoreSlim is similar to AutoResetEvent, not ManualResetEvent
  • Igor Toropov
    Igor Toropov over 4 years
    Another problem is that calling Reset() may through SemaphoreFullException if there is nothing to release left. Even when checking CurrentCount, just before calling Release(), I have situations, when multiple threads pass that check and try to release the semaphore. I am trying to avoid any possibility of exceptions, because they hurt performance.
  • Chris Moschini
    Chris Moschini over 4 years
    This should really be a part of the WaitHandle object in plain .Net, or if you like, something in System.Threading.Tasks
  • Stephen Cleary
    Stephen Cleary over 4 years
    @ChrisMoschini: I don't know for sure, but I suspect the reason this isn't built-in is because it's inefficient compared to an async-native implementation. So, while this is a useful technique in some rare scenarios, it shouldn't be encouraged generally.
  • Chris Moschini
    Chris Moschini over 4 years
    Well, here's the scenario I'm using it for: My web app has stuff it needs to initialize from the database. No need to make every Request thread wait on that, so it runs on a background Task after the app starts. Some pages depend on that data, which will be partly blank if a user hits them the moment the app is deployed, but otherwise fine. But if I debug those pages, and don't wait on that initial data, I get a blank page every time. So I'm using the above to await that before proceeding. Is there a better approach?
  • Stephen Cleary
    Stephen Cleary over 4 years
    @ChrisMoschini: Sounds like a one-time asynchronous signal. ManualResetEvent is a multi-use synchronous signal. It would work, sure, but I'd look into just using Task<T> instead of ManualResetEvent.
  • Chris Moschini
    Chris Moschini over 4 years
    The problem with that, is that the background Task it would be awaiting - if I just used the Task itself that is performing the work - has not necessarily been created or started, when the first Request thread comes asking. ManualResetEvent.AsTask (with a signal that just signals once to Go and never switches back) allows me to create the Task immediately and guarantee any Request thread that comes asking won't find a null Task and create a Race Condition.
  • Stephen Cleary
    Stephen Cleary over 4 years
    @ChrisMoschini: If you cannot guarantee that the Task variable would be set before requests arrive, you could use a TaskCompletionSource<T>.
  • JotaBe
    JotaBe over 3 years
    @StephenCleary I had memory leak problems with this implementation. I received a ManualResetEvent as parameter in a function, and used the AsTask on it, and found a lot of not disposed TaskCompletionSource<Object>, Task<Object>, StandardTaskContinuation, RegisteredWaitHandle, RegisteredWaithandleSafe, ContinuationResultTaskFromresultTask<Object,bool> and _ThreadPoolWaitOrTimerCallback, using it, along a Task, as parameter of Task.WaitAny(). Couldn't understand what was not disposed. Any idea of why?
  • Stephen Cleary
    Stephen Cleary over 3 years
    @JotaBe: I have a more modern approach these days. Both approaches assume that the handle will eventually get signalled (or time out). If it is not, then the task will not be completed. So if you have a loop or something using it in a *Any method, your code could be starting a new wait each time instead of using the existing one.