C# Parallel foreach - System.AggregateException: One or more errors occurred. ---> System.IndexOutOfRangeException

27,173

Solution 1

Found the answer, neither did I have to change my logic to introduce ConcurrentBag nor did I have issues of large volume insert.

The mistake in the code was that I was adding data to the wrong list i.e. sendsToday.Add(best);

Instead I had to add it to "localstorage" which is handled with a lock by .Net itself in my Final storage.

So instead of

 if (best != null)
        sendsToday.Add(best);

The correct code is

if (best != null)
localStorage.SubscriberSends.Add(best);  

and final storage finally merges each with a locking mechanism.

Thanks anyways for all the support !

Solution 2

Inside of your inner foreach you are adding to the sendsToday which is a List<>. This is occurring on every thread.

if (best != null)
    sendsToday.Add(best);
}

Instead of using List<>, try using one of the System.Collections.Concurrent implementations (e.g. ConcurrentBag). It handles all the locking internally for you and is specifically designed for threading.

var sendsToday = new ConcurrentBag<TSend>();

Solution 3

Since the method that generates the exception is List<T>.Add() my guess the exception is thrown here:

sendsToday.Add(best);

Add a lock around the list access and see if that helps.

Share:
27,173
Murtaza Mandvi
Author by

Murtaza Mandvi

Updated on July 09, 2022

Comments

  • Murtaza Mandvi
    Murtaza Mandvi almost 2 years

    Found the answer, ... The mistake in the code was that I was adding data to the wrong list


    I know this has been asked a few times but I am simply unable to figure out why this will give an exception, am I missing a lock somewhere ?

    var sendsToday = new List<TSend>();
    
    var threads = _maxNumberOfThreads;
    if (threads <= 0)
        threads = 1;
    
    Parallel.ForEach(_subscribers,
        new ParallelOptions { MaxDegreeOfParallelism = threads },
        () => new ContentSendLocalStorage<TSend, TMedium>(_contentServices, _logService),
        (subscriber, loopState, localStorage) =>
        {
            localStorage.LogService.Warning(string.Format("Choosing content for subscriber {0}", subscriber.SubscriberId));
            foreach (var newsletterId in subscriber.SubscribedNewsletterIds)
            {
                localStorage.LogService.Warning(string.Format("Choosing content for newsletter {0}", newsletterId));
                var clicks = StateBag.Get<LookupList>(StateKeys.LookupList).Clicks.Where(c => c.Subscriber.SubscriberId == subscriber.SubscriberId).Select(c => c.Content.ContentId).ToArray();
    
                foreach (var contentService in _contentServices.Where(contentService => contentService.Contents.Count != 0))
                {
                    subscriber.UrlsClicked = contentService.Contents
            .Where(c => clicks.Contains(c.ContentId))
            .GroupBy(g => g.Page.Url)
            .ToDictionary(k => k.Key, v => 1);
    
                    var best = contentService.GetBestForSubscriber(subscriber, new TMedium { MediumId = int.Parse(newsletterId) });
                    if (best != null)
            sendsToday.Add(best);
                }
            }
    
            localStorage.LogService.Warning(string.Format("Done choosing content for subscriber {0}", subscriber.SubscriberId));
            return localStorage;
        },
        finalStorage =>
        {
            lock (ContentSendLock)
            {
                sendsToday.AddRange(finalStorage.SubscriberSends);
            }
        });
    

    I keep getting the exception below :

    System.AggregateException: One or more errors occurred. ---> System.IndexOutOfRangeException: Index was outside the bounds of the array.
       at System.Collections.Generic.List`1.Add(T item)
       at WFM.Newsletter.Business.CORE.OptimizationEngine`2.<>c__DisplayClassd.<Start>b__4(Subscriber subscriber, ParallelLoopState loopState, ContentSendLocalStorage`2 localStorage)
       at System.Threading.Tasks.Parallel.<>c__DisplayClass21`2.<ForEachWorker>b__1a(Int32 i, ParallelLoopState state, TLocal local)
       at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
       at System.Threading.Tasks.Task.InnerInvoke()
       at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
       at System.Threading.Tasks.Task.<>c__DisplayClass7.<ExecuteSelfReplicating>b__6(Object )
       --- End of inner exception stack trace ---
       at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
       at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
       at System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Func`4 bodyWithLocal, Func`1 localInit, Action`1 localFinally)
       at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](TSource[] array, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
       at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IEnumerable`1 source, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
       at System.Threading.Tasks.Parallel.ForEach[TSource,TLocal](IEnumerable`1 source, ParallelOptions parallelOptions, Func`1 localInit, Func`4 body, Action`1 localFinally)
       at WFM.Newsletter.Business.CORE.OptimizationEngine`2.Start()
       at WFM.Newsletter.Business.CORE.OptimizationService.Start(IEnumerable`1 subscribers, IEnumerable`1 newsletters, LookupList lookupList, LocalConfig config, ILogging logService, Int32 maxNumberOfThreads, String subscriberClicksSourceTableName)
    ---> (Inner Exception #0) System.IndexOutOfRangeException: Index was outside the bounds of the array.
       at System.Collections.Generic.List`1.Add(T item)
       at WFM.Newsletter.Business.CORE.OptimizationEngine`2.<>c__DisplayClassd.<Start>b__4(Subscriber subscriber, ParallelLoopState loopState, ContentSendLocalStorage`2 localStorage)
       at System.Threading.Tasks.Parallel.<>c__DisplayClass21`2.<ForEachWorker>b__1a(Int32 i, ParallelLoopState state, TLocal local)
       at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
       at System.Threading.Tasks.Task.InnerInvoke()
       at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
       at System.Threading.Tasks.Task.<>c__DisplayClass7.<ExecuteSelfReplicating>b__6(Object )<---
    
    ---> (Inner Exception #1) System.IndexOutOfRangeException: Index was outside the bounds of the array.
       at System.Collections.Generic.List`1.Add(T item)
       at WFM.Newsletter.Business.CORE.OptimizationEngine`2.<>c__DisplayClassd.<Start>b__4(Subscriber subscriber, ParallelLoopState loopState, ContentSendLocalStorage`2 localStorage)
       at System.Threading.Tasks.Parallel.<>c__DisplayClass21`2.<ForEachWorker>b__1a(Int32 i, ParallelLoopState state, TLocal local)
       at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
       at System.Threading.Tasks.Task.InnerInvoke()
       at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
       at System.Threading.Tasks.Task.<>c__DisplayClass7.<ExecuteSelfReplicating>b__6(Object )<---
    
    ---> (Inner Exception #2) System.IndexOutOfRangeException: Index was outside the bounds of the array.
       at System.Collections.Generic.List`1.Add(T item)
       at WFM.Newsletter.Business.CORE.OptimizationEngine`2.<>c__DisplayClassd.<Start>b__4(Subscriber subscriber, ParallelLoopState loopState, ContentSendLocalStorage`2 localStorage)
       at System.Threading.Tasks.Parallel.<>c__DisplayClass21`2.<ForEachWorker>b__1a(Int32 i, ParallelLoopState state, TLocal local)
       at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
       at System.Threading.Tasks.Task.InnerInvoke()
       at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
       at System.Threading.Tasks.Task.<>c__DisplayClass7.<ExecuteSelfReplicating>b__6(Object )<---
    
  • Murtaza Mandvi
    Murtaza Mandvi over 11 years
    so in that case I would not require finalStorage => ?
  • Mike Parkhill
    Mike Parkhill over 11 years
    That's right. Since all the threads would be writing directly to the collection you wouldn't need to aggregate at the end.
  • Murtaza Mandvi
    Murtaza Mandvi over 11 years
    I still doubt it as an issue with locking since in this case .Net is handling the Parallel separation and then I have a finalStorage which merges them all up, don't you thinks so? Just a thought.
  • Mike Parkhill
    Mike Parkhill over 11 years
    Inside your foreach you're adding directly to sendsToday -> sendsToday.Add(best); so you're trying to change the List<> from each thread.