ConcurrentDictionary GetOrAdd async

10,464

Solution 1

GetOrAdd won't become an asynchronous operation because accessing the value of a dictionary isn't a long running operation.

What you can do however is simply store tasks in the dictionary, rather than the materialized result. Anyone needing the results can then await that task.

However, you also need to ensure that the operation is only ever started once, and not multiple times. To ensure that some operation runs only once, and not multiple times, you also need to add in Lazy:

ConcurrentDictionary<string, Lazy<Task<Response>>> _cache = new ConcurrentDictionary<string, Lazy<Task<Response>>>();

var response = await _cache.GetOrAdd("id", url => new Lazy<Task<Response>>(_httpClient.GetAsync(url))).Value;

Solution 2

The GetOrAdd method is not that great to use for this purpose. Since it does not guarantee that the factory runs only once, the only purpose it has is a minor optimization (minor since additions are rare anyway) in that it doesn't need to hash and find the correct bucket twice (which would happen twice if you get and set with two separate calls).

I would suggest that you check the cache first, if you do not find the value in the cache, then enter some form of critical section (lock, semaphore, etc.), re-check the cache, if still missing then fetch the value and insert into the cache.

This ensures that your backing store is only hit once; even if multiple requests get a cache miss at the same time, only the first one will actually fetch the value, the other requests will await the semaphore and then return early since they re-check the cache in the critical section.

Psuedo code (using SemaphoreSlim with count of 1, since you can await it asynchronously):

async Task<TResult> GetAsync(TKey key)
{
    // Try to fetch from catch
    if (cache.TryGetValue(key, out var result)) return result;

    // Get some resource lock here, for example use SemaphoreSlim 
    // which has async wait function:
    await semaphore.WaitAsync();    
    try 
    {
        // Try to fetch from cache again now that we have entered 
        // the critical section
        if (cache.TryGetValue(key, out result)) return result;

        // Fetch data from source (using your HttpClient or whatever), 
        // update your cache and return.
        return cache[key] = await FetchFromSourceAsync(...);
    }
    finally
    {
        semaphore.Release();
    }
}

Solution 3

Try this extension method:

/// <summary>
/// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey, TValue}"/> by using the specified function 
/// if the key does not already exist. Returns the new value, or the existing value if the key exists.
/// </summary>
public static async Task<TResult> GetOrAddAsync<TKey,TResult>(
    this ConcurrentDictionary<TKey,TResult> dict,
    TKey key, Func<TKey,Task<TResult>> asyncValueFactory)
{
    if (dict.TryGetValue(key, out TResult resultingValue))
    {
        return resultingValue;
    }
    var newValue = await asyncValueFactory(key);
    return dict.GetOrAdd(key, newValue);
}

Instead of dict.GetOrAdd(key,key=>something(key)), you use await dict.GetOrAddAsync(key,async key=>await something(key)). Obviously, in this situation you just write it as await dict.GetOrAddAsync(key,something), but I wanted to make it clear.

In regards to concerns about preserving the order of operations, I have the following observations:

  1. Using the normal GetOrAdd will get the same effect if you look at the way it is implemented. I literally used the same code and made it work for async. Reference says

the valueFactory delegate is called outside the locks to avoid the problems that can arise from executing unknown code under a lock. Therefore, GetOrAdd is not atomic with regards to all other operations on the ConcurrentDictionary<TKey,TValue> class

  1. SyncRoot is not supported in ConcurrentDictionary, they use an internal locking mechanism, so locking on it is not possible. Using your own lock mechanism works only for this extension method, though. If you use another flow (using GetOrAdd for example) you will face the same problem.

Solution 4

Probably using a dedicated memory cache (like the new or the old MemoryCache classes, or this third-party library) should be preferable to using a simple ConcurrentDictionary. Unless you don't really need commonly used functionality like time-based expiration, size-based compacting, automatic eviction of entries that are dependent on other entries that have expired, or dependent on mutable external resources (like files, databases etc). It should be noted though that the MemoryCache may still need some work in order to handle asynchronous delegates properly, since its out-of-the-box behavior is not ideal.

Below is a custom extension method GetOrAddAsync for ConcurrentDictionarys that have Task<TValue> values. It accepts a factory method, and ensures that the method will be invoked at most once. It also ensures that failed tasks are removed from the dictionary. This implementation is optimized for the case where getting an existing task happens frequently, and creating a new one happens rarely.

/// <summary>
/// Returns an existing task from the concurrent dictionary, or adds a new task
/// using the specified asynchronous factory method. Concurrent invocations for
/// the same key are prevented, unless the task is removed before the completion
/// of the delegate. Failed tasks are evicted from the concurrent dictionary.
/// </summary>
public static Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
    Func<TKey, Task<TValue>> valueFactory)
{
    if (!source.TryGetValue(key, out var currentTask))
    {
        Task<TValue> newTask = null;
        var newTaskTask = new Task<Task<TValue>>(async () =>
        {
            try { return await valueFactory(key).ConfigureAwait(false); }
            catch
            {
                ((ICollection<KeyValuePair<TKey, Task<TValue>>>)source)
                    .Remove(new KeyValuePair<TKey, Task<TValue>>(key, newTask));
                //source.TryRemove(KeyValuePair.Create(key, newTask)); // .NET 5
                throw;
            }
        });
        newTask = newTaskTask.Unwrap();
        currentTask = source.GetOrAdd(key, newTask);
        if (currentTask == newTask)
            newTaskTask.RunSynchronously(TaskScheduler.Default);
    }
    return currentTask;
}

Usage example:

var cache = new ConcurrentDictionary<string, Task<HttpResponseMessage>>();

var response = await cache.GetOrAddAsync("https://stackoverflow.com", async url =>
{
    return await _httpClient.GetAsync(url);
});

For removing the faulted tasks, this implementation uses the explicitly implemented ICollection<T>.Remove API. More info about this API can be found here. From .NET 5 onwards the new TryRemove(KeyValuePair<TKey, TValue> item) method could be used instead.

Btw in case supreme performance is required, you may want to take a look at the BitFaster.Caching third-party library. I've never used it personally, but the diagrams with its benchmarks look impressive.

Share:
10,464
Zeus82
Author by

Zeus82

I work as a C# Developer at a startup.

Updated on June 19, 2022

Comments

  • Zeus82
    Zeus82 almost 2 years

    I want to use something like GetOrAdd with a ConcurrentDictionary as a cache to a webservice. Is there an async version of this dictionary? GetOrAdd will be making a web request using HttpClient, so it would be nice if there was a version of this dictionary where GetOrAdd was async.

    To clear up some confusion, the contents of the dictionary will be the response from a call to a webservice.

    ConcurrentDictionary<string, Response> _cache
        = new ConcurrentDictionary<string, Response>();
    
    var response = _cache.GetOrAdd("id",
        (x) => { _httpClient.GetAsync(x).GetAwaiter().GetResponse(); });