Fixed size queue which automatically dequeues old values upon new enques

98,873

Solution 1

I would write a wrapper class that on Enqueue would check the Count and then Dequeue when the count exceeds the limit.

 public class FixedSizedQueue<T>
 {
     ConcurrentQueue<T> q = new ConcurrentQueue<T>();
     private object lockObject = new object();

     public int Limit { get; set; }
     public void Enqueue(T obj)
     {
        q.Enqueue(obj);
        lock (lockObject)
        {
           T overflow;
           while (q.Count > Limit && q.TryDequeue(out overflow)) ;
        }
     }
 }

Solution 2

I'd go for a slight variant... extend ConcurrentQueue so as to be able to use Linq extensions on FixedSizeQueue

public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
    private readonly object syncObject = new object();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public new void Enqueue(T obj)
    {
        base.Enqueue(obj);
        lock (syncObject)
        {
            while (base.Count > Size)
            {
                T outObj;
                base.TryDequeue(out outObj);
            }
        }
    }
}

Solution 3

For anyone who finds it useful, here is some working code based on Richard Schneider's answer above:

public class FixedSizedQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        queue.Enqueue(obj);

        while (queue.Count > Size)
        {
            T outObj;
            queue.TryDequeue(out outObj);
        }
    }
}

Solution 4

For what its worth, here's a lightweight circular buffer with some methods marked for safe and unsafe use.

public class CircularBuffer<T> : IEnumerable<T>
{
    readonly int size;
    readonly object locker;

    int count;
    int head;
    int rear;
    T[] values;

    public CircularBuffer(int max)
    {
        this.size = max;
        locker = new object();
        count = 0;
        head = 0;
        rear = 0;
        values = new T[size];
    }

    static int Incr(int index, int size)
    {
        return (index + 1) % size;
    }

    private void UnsafeEnsureQueueNotEmpty()
    {
        if (count == 0)
            throw new Exception("Empty queue");
    }

    public int Size { get { return size; } }
    public object SyncRoot { get { return locker; } }

    #region Count

    public int Count { get { return UnsafeCount; } }
    public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
    public int UnsafeCount { get { return count; } }

    #endregion

    #region Enqueue

    public void Enqueue(T obj)
    {
        UnsafeEnqueue(obj);
    }

    public void SafeEnqueue(T obj)
    {
        lock (locker) { UnsafeEnqueue(obj); }
    }

    public void UnsafeEnqueue(T obj)
    {
        values[rear] = obj;

        if (Count == Size)
            head = Incr(head, Size);
        rear = Incr(rear, Size);
        count = Math.Min(count + 1, Size);
    }

    #endregion

    #region Dequeue

    public T Dequeue()
    {
        return UnsafeDequeue();
    }

    public T SafeDequeue()
    {
        lock (locker) { return UnsafeDequeue(); }
    }

    public T UnsafeDequeue()
    {
        UnsafeEnsureQueueNotEmpty();

        T res = values[head];
        values[head] = default(T);
        head = Incr(head, Size);
        count--;

        return res;
    }

    #endregion

    #region Peek

    public T Peek()
    {
        return UnsafePeek();
    }

    public T SafePeek()
    {
        lock (locker) { return UnsafePeek(); }
    }

    public T UnsafePeek()
    {
        UnsafeEnsureQueueNotEmpty();

        return values[head];
    }

    #endregion


    #region GetEnumerator

    public IEnumerator<T> GetEnumerator()
    {
        return UnsafeGetEnumerator();
    }

    public IEnumerator<T> SafeGetEnumerator()
    {
        lock (locker)
        {
            List<T> res = new List<T>(count);
            var enumerator = UnsafeGetEnumerator();
            while (enumerator.MoveNext())
                res.Add(enumerator.Current);
            return res.GetEnumerator();
        }
    }

    public IEnumerator<T> UnsafeGetEnumerator()
    {
        int index = head;
        for (int i = 0; i < count; i++)
        {
            yield return values[index];
            index = Incr(index, size);
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    #endregion
}

I like to use the Foo()/SafeFoo()/UnsafeFoo() convention:

  • Foo methods call UnsafeFoo as a default.
  • UnsafeFoo methods modify state freely without a lock, they should only call other unsafe methods.
  • SafeFoo methods call UnsafeFoo methods inside a lock.

Its a little verbose, but it makes obvious errors, like calling unsafe methods outside a lock in a method which is supposed to be thread-safe, more apparent.

Solution 5

My version is just a subclass of normal Queue ones.. nothing special but seeing everyone participating and it still goes with the topic title I might as well put it here. It also returns the dequeued ones just in case.

public sealed class SizedQueue<T> : Queue<T>
{
    public int FixedCapacity { get; }
    public SizedQueue(int fixedCapacity)
    {
        this.FixedCapacity = fixedCapacity;
    }

    /// <summary>
    /// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
    /// </summary>
    /// <returns>The dequeued value, if any.</returns>
    public new T Enqueue(T item)
    {
        base.Enqueue(item);
        if (base.Count > FixedCapacity)
        {
            return base.Dequeue();
        }
        return default;
    }
}
Share:
98,873
Xaqron
Author by

Xaqron

Physician

Updated on September 22, 2021

Comments

  • Xaqron
    Xaqron over 2 years

    I'm using ConcurrentQueue for a shared data structure which purpose is holding the last N objects passed to it (kind of history).

    Assume we have a browser and we want to have the last 100 browsed Urls. I want a queue which automatically drop (dequeue) the oldest (first) entry upon new entry insertion (enqueue) when the capacity gets full (100 addresses in history).

    How can I accomplish that using System.Collections ?

    • Admin
      Admin almost 13 years
    • Admin
      Admin almost 13 years
      It wasn't meant specifically for you, but for anyone who comes across this question and might find it useful. btw, it does talk about C# too. Did you manage to read all the answers (in 2 minutes) and figure out that there is no C# code there? Anyway, I am not sure myself, and hence it is a comment...
    • Admin
      Admin almost 13 years
      You can just wrap the methods in a lock. Given that they are fast, you can just lock the whole array. This is probably a dupe though. Searching for circular buffer implementations with C# code might find you something. Anyway, good luck.
  • Erwin Mayer
    Erwin Mayer almost 13 years
    Lock will not be every helpful if Dequeue is called at the same time from other threads that do not lock this...
  • Richard Schneider
    Richard Schneider almost 13 years
    q is private to the object, so that the lock will prevent other threads from simultaneous access.
  • Richard Schneider
    Richard Schneider almost 10 years
    What happens when TryDequeue fails some reason?
  • KFL
    KFL almost 10 years
    It's not a good idea to lock. The whole purpose of the BCL concurrent collections is to provide provide lock free concurrency for performance reason. The locking in your code compromises that benefit. In fact I don't see a reason you need to lock the deq.
  • Richard Schneider
    Richard Schneider almost 10 years
    @KFL, need to lock because Count and TryDequeue are two independent operations that care not synched by BCL Concurrent.
  • 0b101010
    0b101010 over 9 years
    @RichardSchneider If you need to handle concurrency issues yourself then it would be a good idea to swap the ConcurrentQueue<T> object for a Queue<T> object which is more lightweight.
  • mhand
    mhand over 9 years
    what happens when someone statically knows the instance as a ConcurrentQueue<T>, they've just circumvented your 'new' keyword.
  • Dave Lawrence
    Dave Lawrence over 9 years
    @mhand If 'someone' wanted to do that; then they would have chosen to use a ConcurrentQueue<T> object to begin with... This is a custom storage class. Nobody is seeking for this to be submitted to the .NET framework. You've sought out to create a problem for the sake of it.
  • Dave Lawrence
    Dave Lawrence over 9 years
    @RichardSchneider I presume then in that case; then the TryDequeue would be retried.. but I can't see why it would fail. Only one thread could be dequeuing at any one time. I'd have concerns about its behaviour if multiple threads were enqueuing while one thread was dequeuing as the count would be in a period of flux. However as a lightweight approach to a fixed size concurrent queue; it's not a bad approximation.
  • mhand
    mhand over 9 years
    my point is instead of subclassing maybe you should just wrap the queue... this enforces the desired behavior in all cases. Also, since it is a custom storage class, let's make it completely custom, only expose the operations we need, subclassing is the wrong tool here IMHO.
  • Dave Lawrence
    Dave Lawrence over 9 years
    @mhand Yeah I get what you're saying.. I could wrap a queue and expose the queue's enumerator so as to make use of Linq extensions.
  • Zorkind
    Zorkind over 9 years
    Would be better to use a Mutex instead of a lock?
  • Admin
    Admin about 8 years
    How to get items from this queue? Another simple method for deqeue?
  • Chris Marisic
    Chris Marisic over 7 years
    i agree with @mhand you shouldn't inherit ConcurrentQueue because the Enqueue method is not virtual. You should proxy the queue and implement the whole interface if desired.
  • KFL
    KFL about 7 years
    This is broken if used concurrently - what if a thread is preempted after calling _queue.Enqueue(obj) but before Interlocked.Increment(ref _count), and the other thread calls .Count? It would get a wrong count. I haven't checked for the other issues.
  • Daniel Leach
    Daniel Leach over 6 years
    I like this implementation but do note that when none have been added it returns default(T)
  • Stealth Rabbi
    Stealth Rabbi over 6 years
    This doesn't even let you get objects from the queue.
  • Odys
    Odys over 6 years
    Wouldn't be better if we inherit from ConcurrentQueue and add new to the Enqueue method?
  • Josh
    Josh about 6 years
    Voting down for the reasons mentioned (locking when using a ConcurrentQueue is bad) in addition to not implementing any of the requisite interfaces for this to be a true collection.
  • Josh
    Josh about 6 years
    Voting down for locking on the queue. If you absolutely want to lock, a ReaderWriterLockSlim would be best (assuming you expect to take a read lock more often than a write lock). GetSnapshot also isn't needed. If you implement IReadOnlyCollection<T> (which you should for IEnumerable semantics), ToList() will serve the same function.
  • Josh
    Josh about 6 years
    If you use lock in this manner, you should use ReaderWriterLockSlim to prioritize your readers.
  • Gábor
    Gábor about 6 years
    Don't define your own queue, just use the inherited one. If you do as you do, you can actually do nothing else with the queue values, all other functions but your new Enqueue will still call the original queue. In other words, although this answer is marked as accepted, it's completely and utterly broken.
  • Richard Schneider
    Richard Schneider almost 6 years
    @Odys @Gábor I would like to inherit from ConcurrentQueue but the I can't override the Enqueue method.
  • Mubashar
    Mubashar over 5 years
    This is using base.Enqueue(obj); out of the lock if some thread read the values between enqueue and dequeue it would get more than size.
  • jjhayter
    jjhayter almost 4 years
    The answer is wrong in MS's docs: "The ConcurrentQueue<T> and ConcurrentStack<T> classes do not use locks at all. Instead, they rely on Interlocked operations to achieve thread-safety." docs.microsoft.com/en-us/dotnet/standard/collections/… In source code github.com/dotnet/runtime/blob/master/src/libraries/… has an object lock BUT that is on the segment. github.com/dotnet/runtime/blob/master/src/libraries/…
  • jjhayter
    jjhayter almost 4 years
    I've added an example using ConcurrentQueue without locks.
  • jjhayter
    jjhayter almost 4 years
    The ConcurrentQueue handles the locks in its implementation, see the links in my answer.
  • Nils Lande
    Nils Lande over 3 years
    This is not good design. I never allow this use of 'new' in the codebase I'm responsible for. The problem: There are now actually two completely different methods with the name Enqueue(). If someone calls the base class (using the most generic variant of an object is good design) they get a completely different version of the method. I have seen a number of subtle bugs caused by this use of 'new', it should never be used.
  • jim tollan
    jim tollan over 2 years
    thanks for this.. i had an old piece of code from years ago that i'd used to lesser effect.. nice circular FIFO (y)
  • Dave Lawrence
    Dave Lawrence about 2 years
    Near ten years later and possibly a bit wiser - I'd downvote this answer
  • lonix
    lonix almost 2 years
    How do you access the queue - right now it cannot be enumerated or used in linq? Did you expose methods for that, or actually expose the underlying _queue as public (as in the answer it is private).
  • jjhayter
    jjhayter almost 2 years
    @ionix the implementation didn't expose it for the express purpose of keeping the details from leaking.
  • lonix
    lonix almost 2 years
    Do you remember what you meant by Enforce the capacity first so the head can be used instead of the entire segment (slow). Did you mean that dequeue first avoids possibility of internal structure resizing to n+1 ?