java concurrency: many writers, one reader

10,597

Solution 1

As jack was eluding to you can use the java.util.concurrent library which includes a ConcurrentHashMap and AtomicLong. You can put the AtomicLong in if absent else, you can increment the value. Since AtomicLong is thread safe you will be able to increment the variable without worry about a concurrency issue.

public void notify(String key) {
    AtomicLong value = stats.get(key);
    if (value == null) {
        value = stats.putIfAbsent(key, new AtomicLong(1));
    }
    if (value != null) {
        value.incrementAndGet();
    }
}

This should be both fast and thread safe

Edit: Refactored sligthly so there is only at most two lookups.

Solution 2

Why don't you use java.util.concurrent.ConcurrentHashMap<K, V>? It handles everything internally avoiding useless locks on the map and saving you a lot of work: you won't have to care about synchronizations on get and put..

From the documentation:

A hash table supporting full concurrency of retrievals and adjustable expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access.

You can specify its concurrency level:

The allowed concurrency among update operations is guided by the optional concurrencyLevel constructor argument (default 16), which is used as a hint for internal sizing. The table is internally partitioned to try to permit the indicated number of concurrent updates without contention. Because placement in hash tables is essentially random, the actual concurrency will vary. Ideally, you should choose a value to accommodate as many threads as will ever concurrently modify the table. Using a significantly higher value than you need can waste space and time, and a significantly lower value can lead to thread contention. But overestimates and underestimates within an order of magnitude do not usually have much noticeable impact. A value of one is appropriate when it is known that only one thread will modify and all others will only read. Also, resizing this or any other kind of hash table is a relatively slow operation, so, when possible, it is a good idea to provide estimates of expected table sizes in constructors.

As suggested in comments read carefully the documentation of ConcurrentHashMap, especially when it states about atomic or not atomic operations.

To have the guarantee of atomicity you should consider which operations are atomic, from ConcurrentMap interface you will know that:

V putIfAbsent(K key, V value)
V replace(K key, V value)
boolean replace(K key,V oldValue, V newValue)
boolean remove(Object key, Object value)

can be used safely.

Solution 3

I would suggest taking a look at Java's util.concurrent library. I think you can implement this solution a lot cleaner. I don't think you need a map here at all. I would recommend implementing this using the ConcurrentLinkedQueue. Each 'producer' can freely write to this queue without worrying about others. It can put an object on the queue with the data for its statistics.

The harvester can consume the queue continually pulling data off and processsing it. It can then store it however it needs.

Solution 4

Chris Dail's answer looks like a good approach.

Another alternative would be to use a concurrent Multiset. There is one in the Google Collections library. You could use this as follows:

private Multiset<String> stats = ConcurrentHashMultiset.create();

public void notify ( String key )
{
    stats.add(key, 1);
}

Looking at the source, this is implemented using a ConcurrentHashMap and using putIfAbsent and the three-argument version of replace to detect concurrent modifications and retry.

Solution 5

A different approach to the problem is to exploit the (trivial) thread safety via thread confinement. Basically create a single background thread that takes care of both reading and writing. It has a pretty good characteristics in terms of scalability and simplicity.

The idea is that instead of all the threads trying to update the data directly, they produce an "update" task for the background thread to process. The same thread can also do the read task, assuming some lags in processing updates is tolerable.

This design is pretty nice because the threads will no longer have to compete for a lock to update data, and since the map is confined to a single thread you can simply use a plain HashMap to do get/put, etc. In terms of implementation, it would mean creating a single threaded executor, and submitting write tasks which may also perform the optional "collectAndSave" operation.

A sketch of code may look like the following:

public class StatsService {
    private ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Map<String,Long> stats = new HashMap<String,Long>();

    public void notify(final String key) {
        Runnable r = new Runnable() {
            public void run() {
                Long value = stats.get(key);
                if (value == null) {
                    value = 1L;
                } else {
                    value++;
                }
                stats.put(key, value);
                // do the optional collectAndSave periodically
                if (timeToDoCollectAndSave()) {
                    collectAndSave();
                }
            }
        };
        executor.execute(r);
    }
}

There is a BlockingQueue associated with an executor, and each thread that produces a task for the StatsService uses the BlockingQueue. The key point is this: the locking duration for this operation should be much shorter than the locking duration in the original code, so the contention should be much less. Overall it should result in a much better throughput and latency.

Another benefit is that since only one thread reads and writes to the map, plain HashMap and primitive long type can be used (no ConcurrentHashMap or atomic types involved). This also simplifies the code that actually processes it a great deal.

Hope it helps.

Share:
10,597

Related videos on Youtube

Janning
Author by

Janning

I am founder and developer of Kicktipp, a german website for prediction games (Tippspiele). We use Ubuntu, Debian, Linux, Postgresql, Eclipse, Java, Hibernate, Apache, Gimp, and so on...

Updated on April 17, 2022

Comments

  • Janning
    Janning about 2 years

    I need to gather some statistics in my software and i am trying to make it fast and correct, which is not easy (for me!)

    first my code so far with two classes, a StatsService and a StatsHarvester

    public class StatsService
    {
    private Map<String, Long>   stats   = new HashMap<String, Long>(1000);
    
    public void notify ( String key )
    {
        Long value = 1l;
        synchronized (stats)
        {
            if (stats.containsKey(key))
            {
                value = stats.get(key) + 1;
            }
            stats.put(key, value);
        }
    }
    
    public Map<String, Long> getStats ( )
    {
        Map<String, Long> copy;
        synchronized (stats)
        {
            copy = new HashMap<String, Long>(stats);
            stats.clear();
        }
        return copy;
    }
    }
    

    this is my second class, a harvester which collects the stats from time to time and writes them to a database.

    public class StatsHarvester implements Runnable
    {
    private StatsService    statsService;
    private Thread          t;
    
    public void init ( )
    {
        t = new Thread(this);
        t.start();
    }
    
    public synchronized void run ( )
    {
        while (true)
        {
            try
            {
                wait(5 * 60 * 1000); // 5 minutes
                collectAndSave();
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
    
    private void collectAndSave ( )
    {
        Map<String, Long> stats = statsService.getStats();
        // do something like:
        // saveRecords(stats);
    }
    }
    

    At runtime it will have about 30 concurrent running threads each calling notify(key) about 100 times. Only one StatsHarvester is calling statsService.getStats()

    So i have many writers and only one reader. it would be nice to have accurate stats but i don't care if some records are lost on high concurrency.

    The reader should run every 5 Minutes or whatever is reasonable.

    Writing should be as fast as possible. Reading should be fast but if it locks for about 300ms every 5 minutes, its fine.

    I've read many docs (Java concurrency in practice, effective java and so on), but i have the strong feeling that i need your advice to get it right.

    I hope i stated my problem clear and short enough to get valuable help.


    EDIT

    Thanks to all for your detailed and helpful answers. As i expected there is more than one way to do it.

    I tested most of your proposals (those i understood) and uploaded a test project to google code for further reference (maven project)

    http://code.google.com/p/javastats/

    I have tested different implementations of my StatsService

    • HashMapStatsService (HMSS)
    • ConcurrentHashMapStatsService (CHMSS)
    • LinkedQueueStatsService (LQSS)
    • GoogleStatsService (GSS)
    • ExecutorConcurrentHashMapStatsService (ECHMSS)
    • ExecutorHashMapStatsService (EHMSS)

    and i tested them with x number of Threads each calling notify y times, results are in ms

             10,100   10,1000  10,5000  50,100   50,1000  50,5000  100,100  100,1000 100,5000 
    GSS       1        5        17       7        21       117      7        37       254       Summe: 466
    ECHMSS    1        6        21       5        32       132      8        54       249       Summe: 508
    HMSS      1        8        45       8        52       233      11       103      449       Summe: 910
    EHMSS     1        5        24       7        31       113      8        67       235       Summe: 491
    CHMSS     1        2        9        3        11       40       7        26       72        Summe: 171
    LQSS      0        3        11       3        16       56       6        27       144       Summe: 266
    

    At this moment i think i will use ConcurrentHashMap, as it offers good performance while it is quite easy to understand.

    Thanks for all your input! Janning

    • matt b
      matt b about 14 years
      I don't think it is a good idea to change the state of an object while holding a lock on it. Also there should be no need to synchronize the run() method in StatsHarvester.
    • Enno Shioji
      Enno Shioji about 14 years
      synchronized keyword in below snipped (in CHMSS) seems to be unnecessary. Unless you are harvesting stats using multiple threads concurrently (obviously, if so that seems odd). Even if you put synchronized there, it won't stop threads from calling notify() anyway. If I were you, I'd not bother because stats don't have to be "100%" accurate. If you use CHM, there is no way to lock the entire map. synchronized (stats) { copy = new HashMap<String, Number>(stats); stats.clear(); } return copy;
  • Ben Lings
    Ben Lings about 14 years
    It's worth noting that this approach could very easily lead to lost updates.
  • Jack
    Jack about 14 years
    According to Map contract it shouldn't allow lost updates, assuming also that putIfAbsent is executed atomically.
  • Ben Lings
    Ben Lings about 14 years
    To ensure that, you'll need to loop until replace(key, currentValue, currentValue+1) returns true.
  • Ben Lings
    Ben Lings about 14 years
    @BalusC: The docs say it returns "the previous value associated with the specified key, or null if there was no mapping for the key". Your one liner will throw a NullPointerException the first time a particular key is notified.
  • BalusC
    BalusC about 14 years
    @Ben: Yes, you're right (removed wrong comment). Still, this approach is a bit inefficient. I'd rewrite it like follows: AtomicLong value = stats.get(key); if (value == null) { value = new AtomicLong(0); stats.put(key, value); } value.incrementAndGet();. Now the map is scanned at least only once instead of at least two times.
  • jezg1993
    jezg1993 about 14 years
    @BalausC thats not atomic though. What if stats.put replaces a put by another thread in between your if and stats.put?
  • BalusC
    BalusC about 14 years
    Interesting, thanks for the link. Using putIfAbsent() is more a matter of threadsafety, not efficiency.
  • matt b
    matt b about 14 years
    you might want to consider mentioning putIfAbsent in your main post. While the individual operations of ConcurrentHashMap are thread-safe, compound operations naturally are not.
  • M. Jessup
    M. Jessup about 14 years
    In addition AtomicLong / AtomicInteger should be considered as the map values, this would eliminate the issue of lost increments (except potentially on the initial populations, unless the keys are all pre-populated or a call to putIfAbsent is included as pointed out by matt b.).
  • jezg1993
    jezg1993 about 14 years
    @Mattias you are doing a global lock on all writes then, you will punish the ConcurrentHashMap's throughput. The ConcurrentHashMap was written specifically to allow for concurrent writes on non colliding entries. In best case scenario my solution will only block a single entry once per key.
  • Janning
    Janning about 14 years
    I do not understand your post. Sorry, but aren't you shifting the problem to another place. When you say "produce an update task" you have to put that in a map or queue whatever data structure you like. And you have to synchronize this.
  • Sean Reilly
    Sean Reilly about 14 years
    According to javadoc, you don't need the first call to stats.get or the first null check. putIfAbsent will either return the previous mapped value (in which case you should increment), or null if there was no previous value, in which case the provided AtomicLong(1) was inserted.
  • jezg1993
    jezg1993 about 14 years
    Yes however, continously calling putIfAbsent is not the correct way of using ConcurrentHashMap. For every single putIfAbsent you will lock a single entry that is associated to the hashCode. You are doing a needless lock where as if you succeed with the initial get you will never block.
  • Janning
    Janning about 14 years
    yes much clearer. thanks a lot. i upvoted your answer. I am going to test your approach but i think The BlockingQueue must be synchronized in some way. So you move the synchronization from StatsService to the ExecutorService. But i will check it and edit my post to make my results available.
  • Saurabh
    Saurabh almost 14 years
    -1. This is a very inefficient solution. Also you are missing a put after the increment.
  • sjlee
    sjlee almost 14 years
    @finnw Thanks for pointing out the error in the code. I corrected it. As for the efficacy of the solution, (along with most things in life) it depends. When compared against using a ConcurrentHashMap (thus no synchronization being done), it does not perform better. However, if the original solution involves synchronization, this is a very useful alternative. Essentially, it's an asynchronous solution to the problem. Obviously it minimizes the latency from the viewpoint of the threads that generate these tasks. Second, it eliminates lock contention associated with the locking involved.
  • jezg1993
    jezg1993 over 9 years
  • user2800708
    user2800708 over 9 years
    Thanks, I have not really checked out Java 8 yet. Would be interesting to compare with my solution, which is very minimal, to check if it has any extra overhead in comparison. One thing that may differ, the counts for the threads, are they held in ThreadLocal? I avoided that by making each thread explicitly create its own Stats object. Less transparent/simple to use, but avoids the use of ThreadLocal (not that ThreadLocal is so slow any more).
  • user2800708
    user2800708 over 9 years
    This answer should really get some up-votes, because it is way faster than the other answers. I suppose in many cases extremely light-weight stats gathering may not be so important, as the tasks being measured may be so long and heavy weight. But if you really need your stats gathering to slow down the code as little as possible, this is the way to do it.
  • jezg1993
    jezg1993 over 9 years
    What you gain in throughput you lose in consistency. A classic trade-off. This flavor of solution would be best if you need an approximate value but not necessarily the exact value at a moment in time. The StampedLock docs even say something similar.
  • user2800708
    user2800708 over 9 years
    It doesn't matter. You can stop the world with a lock to get an 'exact' value, but the lock will be obtained in a non-deterministic fashion anyway, so the result is no more exact than mine. The important thing is that counts will not be counted twice or missed, and the memory fencing on the volatile ensures the reads and writes to it are consistent. Basically, if you have one thread reading and one writing, you don't need locks or atomics, a volatile is sufficient.
  • SatA
    SatA over 9 years
    @johnvint The code you provided has a problem - if value is null the putIfAbsent will also return null so value will stay null. So the code as it is written now is correct but misleading. You either shouldn't assign the return value from putIfAbsent into value or you should initialize value to some real value.
  • jezg1993
    jezg1993 over 9 years
    @SatA I am confused about your point. If the value is initially null (which means it doesn't exist yet in the map), we will try to place a new AtomicLong(1) into the map. The ConcurrentMap will return either null if the put was successful or the value that already existed in the map if the put wasn't successful. So when we get a non-null value from putIfAbsent that means another thread won in executing the put first, so we should incrementAndGet.
  • jezg1993
    jezg1993 over 9 years
    So the code as it is written now is correct but misleading. I disagree that it is misleading but I guess that's personal preference.
  • Victor Khovanskiy
    Victor Khovanskiy over 5 years
    There is a deadlock when use call r.lock() before w.unlock();