java concurrency: many writers, one reader
Solution 1
As jack was eluding to you can use the java.util.concurrent library which includes a ConcurrentHashMap and AtomicLong. You can put the AtomicLong in if absent else, you can increment the value. Since AtomicLong is thread safe you will be able to increment the variable without worry about a concurrency issue.
public void notify(String key) {
AtomicLong value = stats.get(key);
if (value == null) {
value = stats.putIfAbsent(key, new AtomicLong(1));
}
if (value != null) {
value.incrementAndGet();
}
}
This should be both fast and thread safe
Edit: Refactored sligthly so there is only at most two lookups.
Solution 2
Why don't you use java.util.concurrent.ConcurrentHashMap<K, V>
? It handles everything internally avoiding useless locks on the map and saving you a lot of work: you won't have to care about synchronizations on get and put..
From the documentation:
A hash table supporting full concurrency of retrievals and adjustable expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access.
You can specify its concurrency level:
The allowed concurrency among update operations is guided by the optional concurrencyLevel constructor argument (default 16), which is used as a hint for internal sizing. The table is internally partitioned to try to permit the indicated number of concurrent updates without contention. Because placement in hash tables is essentially random, the actual concurrency will vary. Ideally, you should choose a value to accommodate as many threads as will ever concurrently modify the table. Using a significantly higher value than you need can waste space and time, and a significantly lower value can lead to thread contention. But overestimates and underestimates within an order of magnitude do not usually have much noticeable impact. A value of one is appropriate when it is known that only one thread will modify and all others will only read. Also, resizing this or any other kind of hash table is a relatively slow operation, so, when possible, it is a good idea to provide estimates of expected table sizes in constructors.
As suggested in comments read carefully the documentation of ConcurrentHashMap, especially when it states about atomic or not atomic operations.
To have the guarantee of atomicity you should consider which operations are atomic, from ConcurrentMap
interface you will know that:
V putIfAbsent(K key, V value)
V replace(K key, V value)
boolean replace(K key,V oldValue, V newValue)
boolean remove(Object key, Object value)
can be used safely.
Solution 3
I would suggest taking a look at Java's util.concurrent library. I think you can implement this solution a lot cleaner. I don't think you need a map here at all. I would recommend implementing this using the ConcurrentLinkedQueue. Each 'producer' can freely write to this queue without worrying about others. It can put an object on the queue with the data for its statistics.
The harvester can consume the queue continually pulling data off and processsing it. It can then store it however it needs.
Solution 4
Chris Dail's answer looks like a good approach.
Another alternative would be to use a concurrent Multiset
. There is one in the Google Collections library. You could use this as follows:
private Multiset<String> stats = ConcurrentHashMultiset.create();
public void notify ( String key )
{
stats.add(key, 1);
}
Looking at the source, this is implemented using a ConcurrentHashMap
and using putIfAbsent
and the three-argument version of replace
to detect concurrent modifications and retry.
Solution 5
A different approach to the problem is to exploit the (trivial) thread safety via thread confinement. Basically create a single background thread that takes care of both reading and writing. It has a pretty good characteristics in terms of scalability and simplicity.
The idea is that instead of all the threads trying to update the data directly, they produce an "update" task for the background thread to process. The same thread can also do the read task, assuming some lags in processing updates is tolerable.
This design is pretty nice because the threads will no longer have to compete for a lock to update data, and since the map is confined to a single thread you can simply use a plain HashMap to do get/put, etc. In terms of implementation, it would mean creating a single threaded executor, and submitting write tasks which may also perform the optional "collectAndSave" operation.
A sketch of code may look like the following:
public class StatsService {
private ExecutorService executor = Executors.newSingleThreadExecutor();
private final Map<String,Long> stats = new HashMap<String,Long>();
public void notify(final String key) {
Runnable r = new Runnable() {
public void run() {
Long value = stats.get(key);
if (value == null) {
value = 1L;
} else {
value++;
}
stats.put(key, value);
// do the optional collectAndSave periodically
if (timeToDoCollectAndSave()) {
collectAndSave();
}
}
};
executor.execute(r);
}
}
There is a BlockingQueue associated with an executor, and each thread that produces a task for the StatsService uses the BlockingQueue. The key point is this: the locking duration for this operation should be much shorter than the locking duration in the original code, so the contention should be much less. Overall it should result in a much better throughput and latency.
Another benefit is that since only one thread reads and writes to the map, plain HashMap and primitive long type can be used (no ConcurrentHashMap or atomic types involved). This also simplifies the code that actually processes it a great deal.
Hope it helps.
Related videos on Youtube
Janning
I am founder and developer of Kicktipp, a german website for prediction games (Tippspiele). We use Ubuntu, Debian, Linux, Postgresql, Eclipse, Java, Hibernate, Apache, Gimp, and so on...
Updated on April 17, 2022Comments
-
Janning about 2 years
I need to gather some statistics in my software and i am trying to make it fast and correct, which is not easy (for me!)
first my code so far with two classes, a StatsService and a StatsHarvester
public class StatsService { private Map<String, Long> stats = new HashMap<String, Long>(1000); public void notify ( String key ) { Long value = 1l; synchronized (stats) { if (stats.containsKey(key)) { value = stats.get(key) + 1; } stats.put(key, value); } } public Map<String, Long> getStats ( ) { Map<String, Long> copy; synchronized (stats) { copy = new HashMap<String, Long>(stats); stats.clear(); } return copy; } }
this is my second class, a harvester which collects the stats from time to time and writes them to a database.
public class StatsHarvester implements Runnable { private StatsService statsService; private Thread t; public void init ( ) { t = new Thread(this); t.start(); } public synchronized void run ( ) { while (true) { try { wait(5 * 60 * 1000); // 5 minutes collectAndSave(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void collectAndSave ( ) { Map<String, Long> stats = statsService.getStats(); // do something like: // saveRecords(stats); } }
At runtime it will have about 30 concurrent running threads each calling
notify(key)
about 100 times. Only one StatsHarvester is callingstatsService.getStats()
So i have many writers and only one reader. it would be nice to have accurate stats but i don't care if some records are lost on high concurrency.
The reader should run every 5 Minutes or whatever is reasonable.
Writing should be as fast as possible. Reading should be fast but if it locks for about 300ms every 5 minutes, its fine.
I've read many docs (Java concurrency in practice, effective java and so on), but i have the strong feeling that i need your advice to get it right.
I hope i stated my problem clear and short enough to get valuable help.
EDIT
Thanks to all for your detailed and helpful answers. As i expected there is more than one way to do it.
I tested most of your proposals (those i understood) and uploaded a test project to google code for further reference (maven project)
http://code.google.com/p/javastats/
I have tested different implementations of my StatsService
- HashMapStatsService (HMSS)
- ConcurrentHashMapStatsService (CHMSS)
- LinkedQueueStatsService (LQSS)
- GoogleStatsService (GSS)
- ExecutorConcurrentHashMapStatsService (ECHMSS)
- ExecutorHashMapStatsService (EHMSS)
and i tested them with
x
number of Threads each calling notifyy
times, results are in ms10,100 10,1000 10,5000 50,100 50,1000 50,5000 100,100 100,1000 100,5000 GSS 1 5 17 7 21 117 7 37 254 Summe: 466 ECHMSS 1 6 21 5 32 132 8 54 249 Summe: 508 HMSS 1 8 45 8 52 233 11 103 449 Summe: 910 EHMSS 1 5 24 7 31 113 8 67 235 Summe: 491 CHMSS 1 2 9 3 11 40 7 26 72 Summe: 171 LQSS 0 3 11 3 16 56 6 27 144 Summe: 266
At this moment i think i will use ConcurrentHashMap, as it offers good performance while it is quite easy to understand.
Thanks for all your input! Janning
-
matt b about 14 yearsI don't think it is a good idea to change the state of an object while holding a lock on it. Also there should be no need to synchronize the
run()
method inStatsHarvester
. -
Enno Shioji about 14 years
synchronized
keyword in below snipped (in CHMSS) seems to be unnecessary. Unless you are harvesting stats using multiple threads concurrently (obviously, if so that seems odd). Even if you putsynchronized
there, it won't stop threads from calling notify() anyway. If I were you, I'd not bother because stats don't have to be "100%" accurate. If you use CHM, there is no way to lock the entire map. synchronized (stats) { copy = new HashMap<String, Number>(stats); stats.clear(); } return copy;
-
Ben Lings about 14 yearsIt's worth noting that this approach could very easily lead to lost updates.
-
Jack about 14 yearsAccording to Map contract it shouldn't allow lost updates, assuming also that putIfAbsent is executed atomically.
-
Ben Lings about 14 yearsTo ensure that, you'll need to loop until
replace(key, currentValue, currentValue+1)
returns true. -
Ben Lings about 14 years@BalusC: The docs say it returns "the previous value associated with the specified key, or null if there was no mapping for the key". Your one liner will throw a NullPointerException the first time a particular key is notified.
-
BalusC about 14 years@Ben: Yes, you're right (removed wrong comment). Still, this approach is a bit inefficient. I'd rewrite it like follows:
AtomicLong value = stats.get(key); if (value == null) { value = new AtomicLong(0); stats.put(key, value); } value.incrementAndGet();
. Now the map is scanned at least only once instead of at least two times. -
jezg1993 about 14 years@BalausC thats not atomic though. What if stats.put replaces a put by another thread in between your if and stats.put?
-
BalusC about 14 yearsInteresting, thanks for the link. Using
putIfAbsent()
is more a matter of threadsafety, not efficiency. -
matt b about 14 yearsyou might want to consider mentioning
putIfAbsent
in your main post. While the individual operations of ConcurrentHashMap are thread-safe, compound operations naturally are not. -
M. Jessup about 14 yearsIn addition AtomicLong / AtomicInteger should be considered as the map values, this would eliminate the issue of lost increments (except potentially on the initial populations, unless the keys are all pre-populated or a call to putIfAbsent is included as pointed out by matt b.).
-
jezg1993 about 14 years@Mattias you are doing a global lock on all writes then, you will punish the ConcurrentHashMap's throughput. The ConcurrentHashMap was written specifically to allow for concurrent writes on non colliding entries. In best case scenario my solution will only block a single entry once per key.
-
Janning about 14 yearsI do not understand your post. Sorry, but aren't you shifting the problem to another place. When you say "produce an update task" you have to put that in a map or queue whatever data structure you like. And you have to synchronize this.
-
Sean Reilly about 14 yearsAccording to javadoc, you don't need the first call to stats.get or the first null check. putIfAbsent will either return the previous mapped value (in which case you should increment), or null if there was no previous value, in which case the provided AtomicLong(1) was inserted.
-
jezg1993 about 14 yearsYes however, continously calling putIfAbsent is not the correct way of using ConcurrentHashMap. For every single putIfAbsent you will lock a single entry that is associated to the hashCode. You are doing a needless lock where as if you succeed with the initial get you will never block.
-
Janning about 14 yearsyes much clearer. thanks a lot. i upvoted your answer. I am going to test your approach but i think The BlockingQueue must be synchronized in some way. So you move the synchronization from StatsService to the ExecutorService. But i will check it and edit my post to make my results available.
-
Saurabh almost 14 years-1. This is a very inefficient solution. Also you are missing a
put
after the increment. -
sjlee almost 14 years@finnw Thanks for pointing out the error in the code. I corrected it. As for the efficacy of the solution, (along with most things in life) it depends. When compared against using a ConcurrentHashMap (thus no synchronization being done), it does not perform better. However, if the original solution involves synchronization, this is a very useful alternative. Essentially, it's an asynchronous solution to the problem. Obviously it minimizes the latency from the viewpoint of the threads that generate these tasks. Second, it eliminates lock contention associated with the locking involved.
-
jezg1993 over 9 yearsThought you may appreciate this docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/…
-
user2800708 over 9 yearsThanks, I have not really checked out Java 8 yet. Would be interesting to compare with my solution, which is very minimal, to check if it has any extra overhead in comparison. One thing that may differ, the counts for the threads, are they held in ThreadLocal? I avoided that by making each thread explicitly create its own Stats object. Less transparent/simple to use, but avoids the use of ThreadLocal (not that ThreadLocal is so slow any more).
-
user2800708 over 9 yearsThis answer should really get some up-votes, because it is way faster than the other answers. I suppose in many cases extremely light-weight stats gathering may not be so important, as the tasks being measured may be so long and heavy weight. But if you really need your stats gathering to slow down the code as little as possible, this is the way to do it.
-
jezg1993 over 9 yearsWhat you gain in throughput you lose in consistency. A classic trade-off. This flavor of solution would be best if you need an approximate value but not necessarily the exact value at a moment in time. The StampedLock docs even say something similar.
-
user2800708 over 9 yearsIt doesn't matter. You can stop the world with a lock to get an 'exact' value, but the lock will be obtained in a non-deterministic fashion anyway, so the result is no more exact than mine. The important thing is that counts will not be counted twice or missed, and the memory fencing on the volatile ensures the reads and writes to it are consistent. Basically, if you have one thread reading and one writing, you don't need locks or atomics, a volatile is sufficient.
-
SatA over 9 years@johnvint The code you provided has a problem - if
value
isnull
theputIfAbsent
will also returnnull
sovalue
will staynull
. So the code as it is written now is correct but misleading. You either shouldn't assign the return value fromputIfAbsent
intovalue
or you should initializevalue
to some real value. -
jezg1993 over 9 years@SatA I am confused about your point. If the
value
is initially null (which means it doesn't exist yet in the map), we will try to place anew AtomicLong(1)
into the map. The ConcurrentMap will return either null if the put was successful or the value that already existed in the map if the put wasn't successful. So when we get a non-nullvalue
fromputIfAbsent
that means another thread won in executing the put first, so we shouldincrementAndGet
. -
jezg1993 over 9 years
So the code as it is written now is correct but misleading.
I disagree that it is misleading but I guess that's personal preference. -
Victor Khovanskiy over 5 yearsThere is a deadlock when use call r.lock() before w.unlock();