Concurrent Set Queue

15,633

Solution 1

If you want better concurrency than full synchronization, there is one way I know of to do it, using a ConcurrentHashMap as the backing map. The following is a sketch only.

public final class ConcurrentHashSet<E> extends ForwardingSet<E>
    implements Set<E>, Queue<E> {
  private enum Dummy { VALUE }

  private final ConcurrentMap<E, Dummy> map;

  ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
    super(map.keySet());
    this.map = Preconditions.checkNotNull(map);
  }

  @Override public boolean add(E element) {
    return map.put(element, Dummy.VALUE) == null;
  }

  @Override public boolean addAll(Collection<? extends E> newElements) {
    // just the standard implementation
    boolean modified = false;
    for (E element : newElements) {
      modified |= add(element);
    }
    return modified;
  }

  @Override public boolean offer(E element) {
    return add(element);
  }

  @Override public E remove() {
    E polled = poll();
    if (polled == null) {
      throw new NoSuchElementException();
    }
    return polled;
  }

  @Override public E poll() {
    for (E element : this) {
      // Not convinced that removing via iterator is viable (check this?)
      if (map.remove(element) != null) {
        return element;
      }
    }
    return null;
  }

  @Override public E element() {
    return iterator().next();
  }

  @Override public E peek() {
    Iterator<E> iterator = iterator();
    return iterator.hasNext() ? iterator.next() : null;
  }
}

All is not sunshine with this approach. We have no decent way to select a head element other than using the backing map's entrySet().iterator().next(), the result being that the map gets more and more unbalanced as time goes on. This unbalancing is a problem both due to greater bucket collisions and greater segment contention.

Note: this code uses Guava in a few places.

Solution 2

There's not a built-in collection that does this. There are some concurrent Set implementations that could be used together with a concurrent Queue.

For example, an item is added to the queue only after it was successfully added to the set, and each item removed from the queue is removed from the set. In this case, the contents of the queue, logically, are really whatever is in the set, and the queue is just used to track the order and provide efficient take() and poll() operations found only on a BlockingQueue.

Solution 3

I would use a synchronized LinkedHashSet until there was enough justification to consider alternatives. The primary benefit that a more concurrent solution could offer is lock splitting.

The simplest concurrent approach would be a a ConcurrentHashMap (acting as a set) and a ConcurrentLinkedQueue. The ordering of operations would provide the desired constraint. An offer() would first perform a CHM#putIfAbsent() and if successful insert into the CLQ. A poll() would take from the CLQ and then remove it from the CHM. This means that we consider an entry in our queue if it is in the map and the CLQ provides the ordering. The performance could then be adjusted by increasing the map's concurrencyLevel. If you are tolerant to additional racy-ness, then a cheap CHM#get() could act as a reasonable precondition (but it can suffer by being a slightly stale view).

Solution 4

A java.util.concurrent.ConcurrentLinkedQueue gets you most of the way there.

Wrap the ConcurrentLinkedQueue with your own class that checks for the uniqueness of an add. Your code has to be thread safe.

Solution 5

What do you mean by a concurrent queue with Set semantics? If you mean a truly concurrent structure (as opposed to a thread-safe structure) then I would contend that you are asking for a pony.

What happens for instance if you call put(element) and detect that something is already there which immediately is removed? For instance, what does it mean in your case if offer(element) || queue.contains(element) returns false?

These kinds of things often need to thought about slightly differently in a concurrent world as often nothing is as it seems unless you stop the world (lock it down). Otherwise you are usually looking at something in the past. So, what are you actually trying to do?

Share:
15,633
tomrtc
Author by

tomrtc

Updated on June 26, 2022

Comments

  • tomrtc
    tomrtc almost 2 years

    Maybe this is a silly question, but I cannot seem to find an obvious answer.

    I need a concurrent FIFO queue that contains only unique values. Attempting to add a value that already exists in the queue simply ignores that value. Which, if not for the thread safety would be trivial. Is there a data structure in Java or maybe a code snipit on the interwebs that exhibits this behavior?

  • Saurabh
    Saurabh almost 14 years
    Once wrapped, it might not be thread-safe anymore, even if it is based on a ConcurrentLinkedQueue.
  • tomrtc
    tomrtc almost 14 years
    My first pass implementation pretty much does exactly this but I am worried about the cost of invoking .contains() on a linked list backed queue and also the synchronizing the queue methods completely negates the benefits of the underlying algorithm of ConcurrentLinkdQueue.
  • tomrtc
    tomrtc almost 14 years
    One of my implementations used a LinkedHashSet so that I only had one data structure and could depend on the order. However, the algorithm behind the ConcurrentQueue is quite a bit more sophisticated then using synchronization locks and I was wondering if there was a performant collection that had the additional uniqueness constraint.
  • brady
    brady almost 14 years
    How does this preserve the order of the Queue? The iteration order is dependent on the implementation of the backing map.
  • Gilbert Le Blanc
    Gilbert Le Blanc almost 14 years
    @Ambience: Based on the Javadoc for ConcurrentLinkedQueue, I'm guessing that the invocation time of the contains() method is bound by N. The only method you should have to synchronize in your ConcurrentSetQueue is the add method. The other methods are synchronized in the ConcurrentLinkedQueue.
  • brady
    brady almost 14 years
    @Ambience - There isn't such a collection. My technique allows you to use a concurrent Queue (either LinkedBlockingQueue if you need take() or ConcurrentLinkedQueue if you only need poll()), preserving FIFO ordering, while adding Set-like uniqueness.
  • Kevin Bourrillion
    Kevin Bourrillion almost 14 years
    Preserve what order? I don't see any requirement about order, unless it's assumed that he wants a FIFO queue... I've added a comment to ask.
  • tomrtc
    tomrtc almost 14 years
    Interesting observation, if not an answer =) Are you below the commenting point threshold?
  • Kevin Bourrillion
    Kevin Bourrillion about 11 years
    @NitsanWakart The question I was answering was how to get a Queue.