Thread-safe circular buffer in Java

22,611

Solution 1

Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());

Solution 2

Here's a lock-free ring buffer implementation. It implements a fixed-size buffer - there is no FIFO functionality. I would suggest you store a Collection of requests for each server instead. That way your report can do the filtering rather than getting your data structure to filter.

/**
 * Container
 * ---------
 * 
 * A lock-free container that offers a close-to O(1) add/remove performance.
 * 
 */
public class Container<T> implements Iterable<T> {

  // The capacity of the container.
  final int capacity;
  // The list.
  AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
  // TESTING {
  AtomicLong totalAdded = new AtomicLong(0);
  AtomicLong totalFreed = new AtomicLong(0);
  AtomicLong totalSkipped = new AtomicLong(0);

  private void resetStats() {
    totalAdded.set(0);
    totalFreed.set(0);
    totalSkipped.set(0);
  }
  // TESTING }

  // Constructor
  public Container(int capacity) {
    this.capacity = capacity;
    // Construct the list.
    Node<T> h = new Node<T>();
    Node<T> it = h;
    // One created, now add (capacity - 1) more
    for (int i = 0; i < capacity - 1; i++) {
      // Add it.
      it.next = new Node<T>();
      // Step on to it.
      it = it.next;
    }
    // Make it a ring.
    it.next = h;
    // Install it.
    head.set(h);
  }

  // Empty ... NOT thread safe.
  public void clear() {
    Node<T> it = head.get();
    for (int i = 0; i < capacity; i++) {
      // Trash the element
      it.element = null;
      // Mark it free.
      it.free.set(true);
      it = it.next;
    }
    // Clear stats.
    resetStats();
  }

  // Add a new one.
  public Node<T> add(T element) {
    // Get a free node and attach the element.
    totalAdded.incrementAndGet();
    return getFree().attach(element);
  }

  // Find the next free element and mark it not free.
  private Node<T> getFree() {
    Node<T> freeNode = head.get();
    int skipped = 0;
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free.
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
      skipped += 1;
      freeNode = freeNode.next;
    }
    // Keep count of skipped.
    totalSkipped.addAndGet(skipped);
    if (skipped < capacity) {
      // Put the head as next.
      // Doesn't matter if it fails. That would just mean someone else was doing the same.
      head.set(freeNode.next);
    } else {
      // We hit the end! No more free nodes.
      throw new IllegalStateException("Capacity exhausted.");
    }
    return freeNode;
  }

  // Mark it free.
  public void remove(Node<T> it, T element) {
    totalFreed.incrementAndGet();
    // Remove the element first.
    it.detach(element);
    // Mark it as free.
    if (!it.free.compareAndSet(false, true)) {
      throw new IllegalStateException("Freeing a freed node.");
    }
  }

  // The Node class. It is static so needs the <T> repeated.
  public static class Node<T> {

    // The element in the node.
    private T element;
    // Are we free?
    private AtomicBoolean free = new AtomicBoolean(true);
    // The next reference in whatever list I am in.
    private Node<T> next;

    // Construct a node of the list
    private Node() {
      // Start empty.
      element = null;
    }

    // Attach the element.
    public Node<T> attach(T element) {
      // Sanity check.
      if (this.element == null) {
        this.element = element;
      } else {
        throw new IllegalArgumentException("There is already an element attached.");
      }
      // Useful for chaining.
      return this;
    }

    // Detach the element.
    public Node<T> detach(T element) {
      // Sanity check.
      if (this.element == element) {
        this.element = null;
      } else {
        throw new IllegalArgumentException("Removal of wrong element.");
      }
      // Useful for chaining.
      return this;
    }

    public T get () {
      return element;
    }

    @Override
    public String toString() {
      return element != null ? element.toString() : "null";
    }
  }

  // Provides an iterator across all items in the container.
  public Iterator<T> iterator() {
    return new UsedNodesIterator<T>(this);
  }

  // Iterates across used nodes.
  private static class UsedNodesIterator<T> implements Iterator<T> {
    // Where next to look for the next used node.

    Node<T> it;
    int limit = 0;
    T next = null;

    public UsedNodesIterator(Container<T> c) {
      // Snapshot the head node at this time.
      it = c.head.get();
      limit = c.capacity;
    }

    public boolean hasNext() {
      // Made into a `while` loop to fix issue reported by @Nim in code review
      while (next == null && limit > 0) {
        // Scan to the next non-free node.
        while (limit > 0 && it.free.get() == true) {
          it = it.next;
          // Step down 1.
          limit -= 1;
        }
        if (limit != 0) {
          next = it.element;
        }
      }
      return next != null;
    }

    public T next() {
      T n = null;
      if ( hasNext () ) {
        // Give it to them.
        n = next;
        next = null;
        // Step forward.
        it = it.next;
        limit -= 1;
      } else {
        // Not there!!
        throw new NoSuchElementException ();
      }
      return n;
    }

    public void remove() {
      throw new UnsupportedOperationException("Not supported.");
    }
  }

  @Override
  public String toString() {
    StringBuilder s = new StringBuilder();
    Separator comma = new Separator(",");
    // Keep counts too.
    int usedCount = 0;
    int freeCount = 0;
    // I will iterate the list myself as I want to count free nodes too.
    Node<T> it = head.get();
    int count = 0;
    s.append("[");
    // Scan to the end.
    while (count < capacity) {
      // Is it in-use?
      if (it.free.get() == false) {
        // Grab its element.
        T e = it.element;
        // Is it null?
        if (e != null) {
          // Good element.
          s.append(comma.sep()).append(e.toString());
          // Count them.
          usedCount += 1;
        } else {
          // Probably became free while I was traversing.
          // Because the element is detached before the entry is marked free.
          freeCount += 1;
        }
      } else {
        // Free one.
        freeCount += 1;
      }
      // Next
      it = it.next;
      count += 1;
    }
    // Decorate with counts "]used+free".
    s.append("]").append(usedCount).append("+").append(freeCount);
    if (usedCount + freeCount != capacity) {
      // Perhaps something was added/freed while we were iterating.
      s.append("?");
    }
    return s.toString();
  }
}

Note that this is close to O1 put and get. A Separator just emits "" first time around and then its parameter from then on.

Edit: Added test methods.

// ***** Following only needed for testing. *****
private static boolean Debug = false;
private final static String logName = "Container.log";
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\");

private static synchronized void log(boolean toStdoutToo, String s) {
  if (Debug) {
    if (toStdoutToo) {
      System.out.println(s);
    }
    log(s);
  }
}

private static synchronized void log(String s) {
  if (Debug) {
    try {
      log.writeLn(logName, s);
    } catch (IOException ex) {
      ex.printStackTrace();
    }
  }
}
static volatile boolean testing = true;

// Tester object to exercise the container.
static class Tester<T> implements Runnable {
  // My name.

  T me;
  // The container I am testing.
  Container<T> c;

  public Tester(Container<T> container, T name) {
    c = container;
    me = name;
  }

  private void pause() {
    try {
      Thread.sleep(0);
    } catch (InterruptedException ex) {
      testing = false;
    }
  }

  public void run() {
    // Spin on add/remove until stopped.
    while (testing) {
      // Add it.
      Node<T> n = c.add(me);
      log("Added " + me + ": " + c.toString());
      pause();
      // Remove it.
      c.remove(n, me);
      log("Removed " + me + ": " + c.toString());
      pause();
    }
  }
}
static final String[] strings = {
  "One", "Two", "Three", "Four", "Five",
  "Six", "Seven", "Eight", "Nine", "Ten"
};
static final int TEST_THREADS = Math.min(10, strings.length);

public static void main(String[] args) throws InterruptedException {
  Debug = true;
  log.delete(logName);
  Container<String> c = new Container<String>(10);

  // Simple add/remove
  log(true, "Simple test");
  Node<String> it = c.add(strings[0]);
  log("Added " + c.toString());
  c.remove(it, strings[0]);
  log("Removed " + c.toString());

  // Capacity test.
  log(true, "Capacity test");
  ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
  // Fill it.
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
    log("Added " + strings[i] + " " + c.toString());
  }
  // Add one more.
  try {
    c.add("Wafer thin mint!");
  } catch (IllegalStateException ise) {
    log("Full!");
  }
  c.clear();
  log("Empty: " + c.toString());

  // Iterate test.
  log(true, "Iterator test");
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
  }
  StringBuilder all = new StringBuilder ();
  Separator sep = new Separator(",");
  for (String s : c) {
    all.append(sep.sep()).append(s);
  }
  log("All: "+all);
  for (int i = 0; i < strings.length; i++) {
    c.remove(nodes.get(i), strings[i]);
  }
  sep.reset();
  all.setLength(0);
  for (String s : c) {
    all.append(sep.sep()).append(s);
  }
  log("None: " + all.toString());

  // Multiple add/remove
  log(true, "Multi test");
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
    log("Added " + strings[i] + " " + c.toString());
  }
  log("Filled " + c.toString());
  for (int i = 0; i < strings.length - 1; i++) {
    c.remove(nodes.get(i), strings[i]);
    log("Removed " + strings[i] + " " + c.toString());
  }
  c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
  log("Empty " + c.toString());

  // Multi-threaded add/remove
  log(true, "Threads test");
  c.clear();
  for (int i = 0; i < TEST_THREADS; i++) {
    Thread t = new Thread(new Tester<String>(c, strings[i]));
    t.setName("Tester " + strings[i]);
    log("Starting " + t.getName());
    t.start();
  }
  // Wait for 10 seconds.
  long stop = System.currentTimeMillis() + 10 * 1000;
  while (System.currentTimeMillis() < stop) {
    Thread.sleep(100);
  }
  // Stop the testers.
  testing = false;
  // Wait some more.
  Thread.sleep(1 * 100);
  // Get stats.
  double added = c.totalAdded.doubleValue();
  double skipped = c.totalSkipped.doubleValue();
  //double freed = c.freed.doubleValue();
  log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
}

Solution 3

Maybe you want to look at Disruptor - Concurrent Programming Framework.

  • Find a paper describing the alternatives, design and also a performance comparement to java.util.concurrent.ArrayBlockingQueue here: pdf
  • Consider to read the first three articles from BlogsAndArticles

If the library is too much, stick to java.util.concurrent.ArrayBlockingQueue

Solution 4

I would have a look at ArrayDeque, or for a more concurrent implementation have a look at the Disruptor library which is one of the most sophisticated/complex ring buffer in Java.

An alternative is to use an unbounded queue which is more concurrent as the producer never needs to wait for the consumer. Java Chronicle

Unless your needs justify the complexity, an ArrayDeque may be all you need.

Solution 5

Also have a look at java.util.concurrent.

Blocking queues will block until there is something to consume or (optionally) space to produce:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

Concurrent linked queue is non-blocking and uses a slick algorithm that allows a producer and consumer to be active concurrently:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

Share:
22,611

Related videos on Youtube

Adam Matan
Author by

Adam Matan

Team leader, developer, and public speaker. I build end-to-end apps using modern cloud infrastructure, especially serverless tools. My current position is R&amp;D Manager at Corvid by Wix.com, a serverless platform for rapid web app generation. My CV and contact details are available on my Github README.

Updated on November 23, 2020

Comments

  • Adam Matan
    Adam Matan over 3 years

    Consider a few web server instances running in parallel. Each server holds a reference to a single shared "Status keeper", whose role is keeping the last N requests from all servers.

    For example (N=3):

    Server a: "Request id = ABCD"        Status keeper=["ABCD"]
    Server b: "Request id = XYZZ"        Status keeper=["ABCD", "XYZZ"] 
    Server c: "Request id = 1234"        Status keeper=["ABCD", "XYZZ", "1234"]
    Server b: "Request id = FOO"         Status keeper=["XYZZ", "1234", "FOO"]
    Server a: "Request id = BAR"         Status keeper=["1234", "FOO", "BAR"]
    

    At any point in time, the "Status keeper" might be called from a monitoring application that reads these last N requests for an SLA report.

    What's the best way to implement this producer-consumer scenario in Java, giving the web servers higher priority than the SLA report?

    CircularFifoBuffer seems to be the appropriate data structure to hold the requests, but I'm not sure what's the optimal way to implement efficient concurrency.

    • Emil Vikström
      Emil Vikström almost 12 years
      Define "higher priority". What if the report have started reading the buffer? Should it break and start over if someone wants to write it? Can that in turn lead to starvation?
    • Adam Matan
      Adam Matan almost 12 years
      It should never starve and it should never be stopped, but it can wait a bit longer - meaning that its priority should slowly increase over time.
    • bestsss
      bestsss almost 12 years
      How many producers and how many consumers should the ring buffer have, I will drop some code when you provide data.
    • Adam Matan
      Adam Matan almost 12 years
      Reading is quite scarce, probably not more than once in a few minutes. Writing is very often, might peak to hundreds of calls per second. I can tolerate rare discrepancies.
    • fishinear
      fishinear almost 12 years
      @Adam And how large is N typically? With the example N=3, I would not go with a central "status keeper" server approach, as 99% of the data sent to it is discarded.
    • Adam Matan
      Adam Matan almost 12 years
      @fishinear Somewhere between 1,000 and 10,000.
    • fishinear
      fishinear almost 12 years
      With those low numbers, I would simply go for a synchronized CircularFifoBuffer, as @maydeTo suggests. You wont introduce any significant bottlenecks with that.
  • thkala
    thkala almost 12 years
    One important issue: ArrayDeque is not size-limited. It uses a circular array, true, but it will resize to accommodate more elements as necessary. The OP would have to manually pop() an element before inserting a new one after a while, all while also explicitly maintaining thread-safety...
  • Vishy
    Vishy almost 12 years
    If you need it to be size limited you can use ArrayBlockingQueue.
  • thkala
    thkala almost 12 years
    ArrayBlockingQueue limits its size by blocking until an element is removed. As far as I can tell, the OP wants the queue to implicitly drop/overwrite the oldest element, only keeping the latest N elements.
  • Vishy
    Vishy almost 12 years
    You could be right, this doesn't a sound like a safe behaviour IMHO. Correctness is usually more important than performance and randomly discarding items in a queue in a non-reproduceable manner isn't the sort of thing I would suggest anyone do.
  • thkala
    thkala almost 12 years
    It's not how I would do it either. I'd rather store everything in a database and explicitly purge old records. That would have the added benefit of allowing for more complex queries that might be needed in the future...
  • thkala
    thkala almost 12 years
    Do you have any formal verification for the correctness of this algorithm? Lock-free data structures are notoriously hard to get right, unless you avoid reusing nodes...
  • OldCurmudgeon
    OldCurmudgeon almost 12 years
    @thkala - how 'formal' do you need? The primary algorithm is in the getFree method which selects a free node and marks it for use. It is quite simple and its correctness should be self-evident. I have added my test methods. Perhaps they will help.
  • thkala
    thkala almost 12 years
    The kind of 'formal' that published and peer-reviewed algorithms have. I have worked with lock-free data structures extensively and they can be extremely difficult to get right. There are just too many corner cases...
  • OldCurmudgeon
    OldCurmudgeon almost 12 years
    Sorry but I couldn't resist ... there are no corner cases in a ring. <grin> ... Seriously though, I have not formally published it (except here) or had it peer-reviewed. If you wish I will post it on CodeReview and see what they say. I have been using it in a live environment for quite a while now and it seems to be reliable. I know there is some sloppiness in the statistics it collects but they are not part of the algorithm.
  • MahdeTo
    MahdeTo almost 12 years
    Doesn't really matter as long as the initialization code isn't raceable
  • bestsss
    bestsss almost 12 years
    Disruptor library which is the most sophisticated/complex ring buffer in Java. That's a point of view I presume. The guy doesn't need handoff at all, so disruptor is not fit for the case. Even w/ hand-off flat-combining will outperform producer contended disruptor (it may not be strict FIFO amongst the producers)
  • Vishy
    Vishy almost 12 years
    @bestsss I have qualified that statement. I agree its not the right solution in every case.
  • android developer
    android developer about 7 years
    Where is BufferUtils from? I tried using this from Apache, in gradle file: "compile 'org.apache.commons:commons-collections4:4.1'" , but it's not there...
  • Sometimes_Confused
    Sometimes_Confused over 5 years
    @androiddeveloper In Apache commons collections4 CircularFifoBuffer has been replaced with CircularFifoQueue which can be synchronized by wrapping it with QueueUtils.
  • android developer
    android developer over 5 years
    @Sometimes_Confused I see. Why isn't there a concurrent solution, though?
  • Sometimes_Confused
    Sometimes_Confused over 5 years
    @androiddeveloper There is a solution that handles concurrency. As with the (newer) JDK collections, the basic Queue collection is unsynchronized for performance. If a collection is desired that handles concurrency (i.e., thread-safety), then the collection should be wrapped using the proper synchronized wrapper; see QueueUtils.synchronizedQueue(Queue<E> queue) in the case of the CircularFifoQueue (as my previous comment states).
  • android developer
    android developer over 5 years
    @Sometimes_Confused synchronized solution isn't good for concurrency... It's good just for thread safety... Concurrency mean that multiple threads can perform "problematic" operations at the same time, without making other threads wait for them to finish.
  • Sometimes_Confused
    Sometimes_Confused over 5 years
    @androiddeveloper A synchronized solution is one solution intended to handle concurrency. It is not the only solution, but it is one solution. Apache commons is just following the pattern used by the JDK in offering synchronized collection wrappers for use in thread safety.
  • android developer
    android developer over 5 years
    @Sometimes_Confused Synchronization is a solution, yes, but it's also the most naive one...
  • Thomas W
    Thomas W almost 3 years
    Insertion and deletion look fine, since the Node.free flag is CAS'ed and this also ensures inter-thread visibility of Node.element. However there are probably design issues which show up with the iterator -- since free and element are independent fields, UsedNodesIterator cannot avoid race issues when reading. I believe most java.util.concurrent collections combine 'element' and 'free' state into a single field to allow race-free reading, and disallow null elements to assist this.