开发者

How to safely flush a buffer from a different thread, without synchronized methods?

开发者 https://www.devze.com 2023-02-10 15:34 出处:网络
There are multiple threads, say B, C and D, each writing small packets of data to a buffer at a high frequency. They own their buffer and nobody else ever writes to it. Writing must be as fast as poss

There are multiple threads, say B, C and D, each writing small packets of data to a buffer at a high frequency. They own their buffer and nobody else ever writes to it. Writing must be as fast as possible, and I've determined that using synchronized makes it unacceptably slow.

The buffer开发者_开发技巧s are simply byte arrays, along with the index of the first free element:

byte[] buffer;
int index;

public void write(byte[] data) {
    // some checking that the buffer won't overflow... not important now
    System.arraycopy(data, 0, buffer, index, data.length);
    index += data.length;
}

Every once in a while, thread A comes along to flush everybody's buffer to a file. It's okay if this part has some overhead, so using synchronized here is no problem.

Now the trouble is, that some other thread might be writing to a buffer, while thread A is flushing it. This means that two threads attempt to write to index around the same time. That would lead to data corruption, which I would like to prevent, but without using synchronized in the write() method.

I've got the feeling that, using the right order of operations and probably some volatile fields, this must be possible. Any bright ideas?


Have you tried a solution which uses synchronization, and found it doesn't perform well enough? You say you've determined that it's unacceptably slow - how slow was it, and do you already have a performance budget? Normally, obtaining an uncontested lock is extremely cheap, so I wouldn't expect it to be a problem.

There may well be some clever lock-free solution - but it's likely to be significantly more complicated than just synchronizing whenever you need to access shared data. I understand that lock-free coding is all the rage, and scales beautifully when you can do it - but if you've got one thread interfering with another's data, it's very hard to do it safely. Just to be clear, I like using lock-free code when I can use high-level abstractions created by experts - things like the Parallel Extensions in .NET 4. I just don't like working with low-level abstractions like volatile variables if I can help it.

Try locking, and benchmark it. Work out what performance is acceptable, and compare the performance of a simple solution with that goal.

Of course, one option is redesigning... does the flushing have to happen actively in a different thread? Could the individual writer threads not just hand off the buffer to the flushing thread (and start a different buffer) periodically? That would make things a lot simpler.

EDIT: Regarding your "flush signal" idea - I'd been thinking along similar lines. But you need to be careful about how you do it so that the signal can't get lost even if one thread takes a long time to process whatever it's doing. I suggest you make thread A publish a "flush counter"... and each thread keeps its own counter of when it last flushed.

EDIT: Just realized this is Java, not C# - updated :)

Use AtomicLong.incrementAndGet() to increment from thread A, and AtomicLong.get() to read from the other threads. Then in each thread, compare whether you're "up to date", and flush if necessary:

private long lastFlush; // Last counter for our flush
private Flusher flusher; // The single flusher used by all threads 

public void write(...)
{
    long latestFlush = flusher.getCount(); // Will use AtomicLong.get() internally
    if (latestFlush > lastFlush)
    {
        flusher.Flush(data);
        // Do whatever else you need
        lastFlush = latestFlush; // Don't use flusher.getCount() here!
    }
    // Now do the normal write
}

Note that this assumes you only ever need to check for flushing in the Write method. Obviously that may not be the case, but hopefully you can adapt the idea.


You can use volatile alone to safely read/write to a buffer (if you have only one writer) however, only one thread can safely flush the data. To do this you can use a ring buffer.

I would add to @Jon's comment that this is significantly more complicated to test. e.g. I had one "solution" which worked for 1 billion messages consistently one day but kept breaking the next because the box was more loaded.

With synchronized your latency should be below 2 micro-seconds. With Lock, you could get this down to 1 micro-second. with busy waiting on a volatile you can get this down to 3-6 ns per byte (The time it takes to transfer data between threads becomes important)

Note: as the volume of data increases the relative cost of the lock becomes less important. e.g. if you are typically writing 200 bytes or more I wouldn't worry about the difference.

One approach I take is to use the Exchanger with two direct ByteBuffers and avoid writing any data in the critical path (i.e. only write the data after I have processed everything and it doesn't matter so much)


Invert control. Rather than having A poll the other threads, let them push.

I suppose LinkedBlockingQueue might be the most simple thing to go with.

Pseudocode:

LinkedBlockingQueue<byte[]> jobs;//here the buffers intended to be flushed are pushed into 
LinkedBlockingQueue<byte[]> pool;//here the flushed buffers are pushed into for reuse

Writing thread:

while (someCondition) {
     job = jobs.take();
     actualOutput(job);
     pool.offer(job);
}

Other threads:

void flush() {
     jobs.offer(this.buffer);
     this.index = 0;
     this.buffer = pool.poll();
     if (this.buffer == null) 
          this.buffer = createNewBuffer();
}
void write(byte[] data) {
    // some checking that the buffer won't overflow... not important now
    System.arraycopy(data, 0, buffer, index, data.length);
    if ((index += data.length) > threshold) 
         this.flush();
}

LinkedBlockingQueue basically encapsulates the technical means to pass messages safely between threads.
Not only is it simpler this way round, but it clearly seperates concerns, because the threads that actually generate the output determine when they want to flush their buffers and they are the only ones to maintain their own state.
The buffers that are in both queues present a memory overhead, but that should be acceptable. The pool is unlikely to grow signifficantly bigger than the total number of threads and unless actual output presents a bottleneck, the jobs queue should be empty most of the time.


Volatile Variables And A Circular Buffer

Use a circular buffer, and make the flushing thread "chase" the writes around the buffer instead of resetting the index to zero after each flush. This allows writes to occur during a flush without any locking.

Use two volatile variables - writeIndex for where the writing thread is up to, and flushIndex for where the flushing thread is up to. These variables are each updated by only one thread, and can be read atomically by the other thread. Use these variables to keep the threads constrained to separate sections of the buffer. Do not allow the flushing thread to go past where the writing thread is up to (i.e. flush an unwritten part of the buffer). Do not allow the writing thread to go past where the flushing thread is up to (i.e. overwrite an unflushed part of the buffer).

Writing thread loop:

  • Read writeIndex (atomic)
  • Read flushIndex (atomic)
  • Check that this write will not overwrite unflushed data
  • Write to the buffer
  • Calculate the new value for writeIndex
  • Set writeIndex (atomic)

Flushing thread loop:

  • Read writeIndex (atomic)
  • Read flushIndex (atomic)
  • Flush the buffer from flushIndex to writeIndex - 1
  • Set flushIndex (atomic) to the value that was read for writeIndex

But, WARNING: for this to work, the buffer array elements might also need to be volatile, which you can't do in Java (yet). See http://jeremymanson.blogspot.com/2009/06/volatile-arrays-in-java.html

Nevertheless, here's my implementation (changes are welcome):

volatile int writeIndex = 0;
volatile int flushIndex = 0;
byte[] buffer = new byte[268435456];

public void write(byte[] data) throws Exception {
    int localWriteIndex = writeIndex; // volatile read
    int localFlushIndex = flushIndex; // volatile read

    int freeBuffer = buffer.length - (localWriteIndex - localFlushIndex +
        buffer.length) % buffer.length;

    if (data.length > freeBuffer)
        throw new Exception("Buffer overflow");

    if (localWriteIndex + data.length <= buffer.length) {
        System.arraycopy(data, 0, buffer, localWriteIndex, data.length);
        writeIndex = localWriteIndex + data.length;
    }
    else
    {
        int firstPartLength = buffer.length - localWriteIndex;
        int secondPartLength = data.length - firstPartLength;

        System.arraycopy(data, 0, buffer, localWriteIndex, firstPartLength);
        System.arraycopy(data, firstPartLength, buffer, 0, secondPartLength);

        writeIndex = secondPartLength;
    }
}

public byte[] flush() {
    int localWriteIndex = writeIndex; // volatile read
    int localFlushIndex = flushIndex; // volatile read

    int usedBuffer = (localWriteIndex - localFlushIndex + buffer.length) %
        buffer.length;
    byte[] output = new byte[usedBuffer];

    if (localFlushIndex + usedBuffer <= buffer.length) {
        System.arraycopy(buffer, localFlushIndex, output, 0, usedBuffer);
        flushIndex = localFlushIndex + usedBuffer;
    }
    else {
        int firstPartLength = buffer.length - localFlushIndex;
        int secondPartLength = usedBuffer - firstPartLength;

        System.arraycopy(buffer, localFlushIndex, output, 0, firstPartLength);
        System.arraycopy(buffer, 0, output, firstPartLength, secondPartLength);

        flushIndex = secondPartLength;
    }

    return output;
}


Perhaps:

import java.util.concurrent.atomic;    

byte[] buffer;
AtomicInteger index;

public void write(byte[] data) {
    // some checking that the buffer won't overflow... not important now
    System.arraycopy(data, 0, buffer, index, data.length);
    index.addAndGet(data.length);
}

public int getIndex() {
    return index.get().intValue();
}

otherwise the lock classes in the java.util.concurrent.lock package are more lightweight than the synchronized keyword...

so:

byte[] buffer;
int index;
ReentrantReadWriteLock lock;

public void write(byte[] data) {
    lock.writeLock().lock();
    // some checking that the buffer won't overflow... not important now
    System.arraycopy(data, 0, buffer, index, data.length);
    index += data.length;
    lock.writeLock.unlock();
}

and in the flushing thread:

object.lock.readLock().lock(); 
// flush the buffer      
object.index = 0;                     
object.lock.readLock().unlock();

UPDATE:
The pattern you describe for reading and writing to the buffer will not benefit from using a ReadWriteLock implementation, so just use a plain ReentrantLock:

final int SIZE = 99;
byte[] buffer = new byte[SIZE];
int index;
// Use default non-fair lock to maximise throughput (although some writer threads may wait longer)
ReentrantLock lock = new ReentrantLock();

// called by many threads
public void write(byte[] data) {
    lock.lock();
    // some checking that the buffer won't overflow... not important now        
    System.arraycopy(data, 0, buffer, index, data.length);
    index += data.length;
    lock.unlock();
}

// Only called by 1 thread - or implemented in only 1 thread:
public byte[] flush() {
    byte[] rval = new byte[index];
    lock.lock();
    System.arraycopy(buffer, 0, rval, 0, index);
    index = 0;
    lock.unlock();
    return rval;
}

As you describe usage as many write threads with a single reader/flusher thread, a ReadWriteLock is not neccessary, Infact I beleive it is more heavyweight than a simple ReentrantLock (?). ReadWriteLocks are useful for many reader threads, with few write threads - the opposite of the situation you describe.


You can try implementing semaphores.


I like the lock-free stuff, it's addictive :). And rest ensured: they remove a lot locking shortcomings, coming w/ some steep learning curve. Still they're and error-prone.

Read few articles, perhaps a book and try it home 1st. How to handle your case? You can't atomically copy data (and update size), but you can atomically update a reference to that data.
simple way to do it; Note: you can ALWAYS read from the buffer w/o holding a lock which is the entire point.

final AtomicReference<byte[]> buffer=new AtomicReference<byte[]>(new byte[0]);
void write(byte[] b){
    for(;;){
        final byte[] cur = buffer.get();
        final byte[] copy = Arrays.copyOf(cur, cur.length+b.length);
        System.arraycopy(b, 0, cur, cur.length, b.length);
        if (buffer.compareAndSet(cur, copy)){
            break;
        }
            //there was a concurrent write
            //need to handle it, either loop to add at the end but then you can get out of order
            //just as sync
    }
}

You actually you can still use a larger byte[] and append to it but I leave the exercise for yourself.


Continued

I had to write the code in a pinch. A short description follows: The code is lock-free but not-obstruction free due to use of the CLQ. As you see the code always continues regardless of the conditions taken and practically doesn't loop (busy wait) anywhere besides the CLQ, itself.

Many lock-free algorithms rely on the help of all the threads to properly finish the task(s). There might be some mistake but I hope the main idea is sort of clear:

  • The algorithm allows many writers, many readers
  • If the main state cannot be changed so there is a single writer only, append the byte[] into a queue.
  • Any writer (that succeeded on the CAS) must attempt to flush the queue prior to writing its own data.
  • A reader must check for pending writes and flush them before using the main buffer
  • If enlarging (current byte[] not enough) the buffer and the size must be thrown away and new generation of Buffer+Size is to be used. Otherwise only size is increased. The operation again requires to hold the lock (i.e. the CAS succeeded)

Please, any feedback is welcome. Cheers and hopefully people can warm up to the lock-free structures algorithms.

package bestsss.util;

import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

//the code uses ConcurrentLinkedQueue to simplify the implementation
//the class is well - know and the main point is to demonstrate the  lock-free stuff
public class TheBuffer{
    //buffer generation, if the room is exhaused need to update w/ a new refence
    private static class BufGen{
        final byte[] data;
        volatile int size;

        BufGen(int capacity, int size, byte[] src){
            this.data = Arrays.copyOf(src, capacity);
            this.size  = size;
        }

        BufGen append(byte[] b){
            int s = this.size;
            int newSize = b.length+s;
            BufGen target;
            if (newSize>data.length){
                int cap = Integer.highestOneBit(newSize)<<1;
                if (cap<0){
                    cap = Integer.MAX_VALUE;                    
                }               
                target = new BufGen(cap, this.size, this.data);             
            } 
            else if(newSize<0){//overflow 
                throw new IllegalStateException("Buffer overflow - over int size");
            } else{ 
                target = this;//if there is enough room(-service), reuse the buffer
            }
            System.arraycopy(b, 0, target.data, s, b.length);
            target.size = newSize;//'commit' the changes and update the size the copy part, so both are visible at the same time
            //that's the volatile write I was talking about
            return target;
        }       
    }

    private volatile BufGen buffer = new BufGen(16,0,new byte[0]);

    //read consist of 3 volatile reads most of the time, can be 2 if BufGen is recreated each time
    public byte[] read(int[] targetSize){//ala AtomicStampedReference
        if (!pendingWrites.isEmpty()){//optimistic check, do not grab the look and just do a volatile-read
            //that will serve 99%++ of the cases
            doWrite(null, READ);//yet something in the queue, help the writers
        }
        BufGen buffer = this.buffer;
        targetSize[0]=buffer.size;
        return  buffer.data;
    }
    public void write(byte[] b){
        doWrite(b, WRITE);
    }

    private static final int FREE = 0;
    private static final int WRITE = 1;
    private static final int READ= 2;

    private final AtomicInteger state = new AtomicInteger(FREE);
    private final ConcurrentLinkedQueue<byte[]> pendingWrites=new ConcurrentLinkedQueue<byte[]>();
    private void doWrite(byte[] b, int operation) {
        if (state.compareAndSet(FREE, operation)){//won the CAS hurray!
            //now the state is held "exclusive"
            try{
                //1st be nice and poll the queue, that gives fast track on the loser
                //we too nice 
                BufGen buffer = this.buffer;
                for(byte[] pending; null!=(pending=pendingWrites.poll());){
                    buffer = buffer.append(pending);//do not update the global buffer yet
                }
                if (b!=null){
                    buffer = buffer.append(b);
                }
                this.buffer = buffer;//volatile write and make sure any data is updated
            }finally{
                state.set(FREE);
            }
        } 
        else{//we lost the CAS, well someone must take care of the pending operation 
            if (b==null)
                return;

            pendingWrites.add(b);           
        }
    }


    public static void main(String[] args) {
        //usage only, not a test for conucrrency correctness
        TheBuffer buf = new TheBuffer();        
        buf.write("X0X\n".getBytes());
        buf.write("XXXXXXXXXXAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAXXXXXXXXXXXXXXXXXXX\n".getBytes());
        buf.write("Hello world\n".getBytes());
        int[] size={0};
        byte[] bytes = buf.read(size);
        System.out.println(new String(bytes, 0, size[0]));
    }
}

Simplistic case

Another far simpler solution that allows many writers but single reader. It postpones the writes into a CLQ and the reader just reconstructs 'em. The construction code is omiitted this time.

package bestsss.util;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;

public class TheSimpleBuffer {
    private final ConcurrentLinkedQueue<byte[]> writes =new ConcurrentLinkedQueue<byte[]>();
    public void write(byte[] b){
        writes.add(b);
    }

    private byte[] buffer;
    public byte[] read(int[] targetSize){
        ArrayList<byte[]> copy = new ArrayList<byte[]>(12);
        int len = 0;
        for (byte[] b; null!=(b=writes.poll());){
            copy.add(b);
            len+=b.length;
            if (len<0){//cant return this big, overflow 
                len-=b.length;//fix back;
                break;
            }
        }
        //copy, to the buffer, create new etc....
        //...

        ///
        targetSize[0]=len;
        return buffer; 
    }

}
0

精彩评论

暂无评论...
验证码 换一张
取 消