开发者

java concurrency: many writers, one reader

开发者 https://www.devze.com 2022-12-25 13:58 出处:网络
I need to gather some statistics in my software and i am trying to make it fast and correct, which is not easy (for me!)

I need to gather some statistics in my software and i am trying to make it fast and correct, which is not easy (for me!)

first my code so far with two classes, a StatsService and a StatsHarvester

public class StatsService
{
private Map<String, Long>   stats   = new HashMap<String, Long>(1000);

public void notify ( String key )
{
    Long value = 1l;
    synchronized (stats)
    {
        if (stats.containsKey(key))
        {
            value = stats.get(key) + 1;
        }
        stats.put(key, value);
    }
}

public Map<String, Long> getStats ( )
{
    Map<String, Long> copy;
    synchronized (stats)
    {
        copy = new HashMap<String, Long>(stats);
        stats.clear();
    }
    return copy;
}
}

this is my second class, a harvester which collects the stats from time to time and writes them to a database.

public class StatsHarvester implements Runnable
{
private StatsService    statsService;
private Thread          t;

public void init ( )
{
    t = new Thread(this);
    t.start();
}

public synchronized void run ( )
{
    while (true)
    {
        try
        {
            wait(5 * 60 * 1000); // 5 minutes
            collectAndSave();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

private void collectAndSave ( )
{
    Map<String, Long> stats = statsService.getStats();
    // do something like:
    // saveRecords(stats);
}
}

At runtime it will have about 30 concurrent running threads each calling notify(key) about 100 times. Only one StatsHarvester is calling statsService.getStats()

So i have many writers and only one reader. it would be nice to have accurate stats but i don't care if some records are lost on high concurrency.

The reader should run every 5 Minutes or whatever is reasonable.

Writing should be as fast as possible. Reading should be fast but if it locks for about 300ms every 5 minutes, its fine.

I've read many docs (Java concurrency in practice, effective java and so on), but i have the strong feeling that i need your advice to get it right.

I hope i stated my problem clear and short enough to get valuable help.


EDIT

Thanks to all for your detailed and helpful answers. As i expected there is more than one way to do it.

I tested most of your proposals (those i understood) and uploaded a test project to google code for further reference (maven project)

http://code.google.com/p/javastats/

I have tested different implementations of my StatsService

  • HashMapStatsService (HMSS)
  • ConcurrentHashMapStatsService (CHMSS)
  • LinkedQueueStatsService (LQSS)
  • GoogleStatsService (GSS)
  • ExecutorConcurrentHashMapStatsService (ECHMSS)
  • ExecutorHashMapStatsService (EHMSS)

and i tested them with x number of Threads each calling notify y times, results are in ms

         10,100   10,1000  10,5000  50,100   50,1000  50,5000  100,100  100,1000 100,5000 
GSS       1        5        17       7        21       117      7        37       254       Summe: 466
ECHMSS    1        6        21       5        32       132      8        54       249       Summe: 508
HMSS      1        8        45       8        52       233      11       103      449       Summe: 910
EHMSS     1        5        24       7        31       113      8        67       235       Summe: 491
CHMSS     1        2        9        3        11       40       7        26       72        Summe: 171
LQSS      0        3开发者_运维技巧        11       3        16       56       6        27       144       Summe: 266

At this moment i think i will use ConcurrentHashMap, as it offers good performance while it is quite easy to understand.

Thanks for all your input! Janning


As jack was eluding to you can use the java.util.concurrent library which includes a ConcurrentHashMap and AtomicLong. You can put the AtomicLong in if absent else, you can increment the value. Since AtomicLong is thread safe you will be able to increment the variable without worry about a concurrency issue.

public void notify(String key) {
    AtomicLong value = stats.get(key);
    if (value == null) {
        value = stats.putIfAbsent(key, new AtomicLong(1));
    }
    if (value != null) {
        value.incrementAndGet();
    }
}

This should be both fast and thread safe

Edit: Refactored sligthly so there is only at most two lookups.


Why don't you use java.util.concurrent.ConcurrentHashMap<K, V>? It handles everything internally avoiding useless locks on the map and saving you a lot of work: you won't have to care about synchronizations on get and put..

From the documentation:

A hash table supporting full concurrency of retrievals and adjustable expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access.

You can specify its concurrency level:

The allowed concurrency among update operations is guided by the optional concurrencyLevel constructor argument (default 16), which is used as a hint for internal sizing. The table is internally partitioned to try to permit the indicated number of concurrent updates without contention. Because placement in hash tables is essentially random, the actual concurrency will vary. Ideally, you should choose a value to accommodate as many threads as will ever concurrently modify the table. Using a significantly higher value than you need can waste space and time, and a significantly lower value can lead to thread contention. But overestimates and underestimates within an order of magnitude do not usually have much noticeable impact. A value of one is appropriate when it is known that only one thread will modify and all others will only read. Also, resizing this or any other kind of hash table is a relatively slow operation, so, when possible, it is a good idea to provide estimates of expected table sizes in constructors.

As suggested in comments read carefully the documentation of ConcurrentHashMap, especially when it states about atomic or not atomic operations.

To have the guarantee of atomicity you should consider which operations are atomic, from ConcurrentMap interface you will know that:

V putIfAbsent(K key, V value)
V replace(K key, V value)
boolean replace(K key,V oldValue, V newValue)
boolean remove(Object key, Object value)

can be used safely.


I would suggest taking a look at Java's util.concurrent library. I think you can implement this solution a lot cleaner. I don't think you need a map here at all. I would recommend implementing this using the ConcurrentLinkedQueue. Each 'producer' can freely write to this queue without worrying about others. It can put an object on the queue with the data for its statistics.

The harvester can consume the queue continually pulling data off and processsing it. It can then store it however it needs.


Chris Dail's answer looks like a good approach.

Another alternative would be to use a concurrent Multiset. There is one in the Google Collections library. You could use this as follows:

private Multiset<String> stats = ConcurrentHashMultiset.create();

public void notify ( String key )
{
    stats.add(key, 1);
}

Looking at the source, this is implemented using a ConcurrentHashMap and using putIfAbsent and the three-argument version of replace to detect concurrent modifications and retry.


A different approach to the problem is to exploit the (trivial) thread safety via thread confinement. Basically create a single background thread that takes care of both reading and writing. It has a pretty good characteristics in terms of scalability and simplicity.

The idea is that instead of all the threads trying to update the data directly, they produce an "update" task for the background thread to process. The same thread can also do the read task, assuming some lags in processing updates is tolerable.

This design is pretty nice because the threads will no longer have to compete for a lock to update data, and since the map is confined to a single thread you can simply use a plain HashMap to do get/put, etc. In terms of implementation, it would mean creating a single threaded executor, and submitting write tasks which may also perform the optional "collectAndSave" operation.

A sketch of code may look like the following:

public class StatsService {
    private ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Map<String,Long> stats = new HashMap<String,Long>();

    public void notify(final String key) {
        Runnable r = new Runnable() {
            public void run() {
                Long value = stats.get(key);
                if (value == null) {
                    value = 1L;
                } else {
                    value++;
                }
                stats.put(key, value);
                // do the optional collectAndSave periodically
                if (timeToDoCollectAndSave()) {
                    collectAndSave();
                }
            }
        };
        executor.execute(r);
    }
}

There is a BlockingQueue associated with an executor, and each thread that produces a task for the StatsService uses the BlockingQueue. The key point is this: the locking duration for this operation should be much shorter than the locking duration in the original code, so the contention should be much less. Overall it should result in a much better throughput and latency.

Another benefit is that since only one thread reads and writes to the map, plain HashMap and primitive long type can be used (no ConcurrentHashMap or atomic types involved). This also simplifies the code that actually processes it a great deal.

Hope it helps.


Have you looked into ScheduledThreadPoolExecutor? You could use that to schedule your writers, which could all write to a concurrent collection, such as the ConcurrentLinkedQueue mentioned by @Chris Dail. You can have a separately schedule job to read from the Queue as necessary, and the Java SDK should handle pretty much all your concurrency concerns, no manual locking needed.


If we ignore the harvesting part and focus on the writing, the main bottleneck of the program is that the stats are locked at a very coarse level of granularity. If two threads want to update different keys, they must wait.

If you know the set of keys in advance, and can preinitialize the map so that by the time an update thread arrives the key is guaranteed to exist, you would be able to do locking on the accumulator variable instead of the whole map, or use a thread-safe accumulator object.

Instead of implementing this yourself, there are map implementations that are designed specifically for concurrency and do this more fine-grained locking for you.

One caveat though are the stats, since you would need to get locks on all the accumulators at roughly the same time. If you use an existing concurrency-friendly map, there might be a construct for getting a snapshot.


Another alternative for implement both methods using ReentranReadWriteLock. This implementation protects against race conditions at getStats method, if you need to clear the counters. Also it removes the mutable AtomicLong from the getStats an uses an immutable Long.

public class StatsService {

    private final Map<String, AtomicLong> stats = new HashMap<String, AtomicLong>(1000);
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public void  notify(final String key) {
        r.lock();
        AtomicLong count = stats.get(key);
        if (count == null) {
            r.unlock();
            w.lock();
            count = stats.get(key);
            if(count == null) { 
                count = new AtomicLong();
                stats.put(key, count);
            }
            r.lock();
            w.unlock();
        }
        count.incrementAndGet();
        r.unlock();
    }

    public Map<String, Long> getStats() {
        w.lock();

        Map<String, Long> copy = new HashMap<String, Long>();
        for(Entry<String,AtomicLong> entry : stats.entrySet() ){
                copy.put(entry.getKey(), entry.getValue().longValue());
        }
        stats.clear();
        w.unlock();

        return copy;
    }
}

I hope this helps, any comments are welcome!


Here is how to do it with minimal impact on the performance of the threads being measured. This is the fastest solution possible in Java, without resorting to special hardware registers for performance counting.

Have each thread output its stats independently of the others, that is with no synchronization, to some stats object. Make the field containing the count volatile, so it is memory fenced:

class Stats
{
   public volatile long count;
}

class SomeRunnable implements Runnable
{
   public void run()
   {
     doStuff();
     stats.count++;
   }
}

Have another thread, that holds a reference to all the Stats objects, periodically go around them all and add up the counts across all threads:

public long accumulateStats()
{
   long count = previousCount;

   for (Stats stat : allStats)
   {
       count += stat.count;
   }

   long resultDelta = count - previousCount;
   previousCount = count;

   return resultDelta;
}

This gatherer thread also needs a sleep() (or some other throttle) added to it. It can periodically output counts/sec to the console for example, to give you a "live" view of how your application is performing.

This avoids the synchronization overhead about as much as you can.

The other trick to consider is padding the Stats objects to 128 (or 256 bytes on SandyBridge or later), so as to keep the different threads counts on different cache lines, or there will be caching contention on the CPU.

When only one thread reads and one writes, you do not need locks or atomics, a volatile is sufficient. There will still be some thread contention, when the stats reader thread interacts with the CPU cache line of the thread being measured. This cannot be avoided, but it is the way to do it with minimal impact on the running thread; read the stats maybe once a second or less.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号