I'm trying to find a way to make a Lock Free OR Non-blocking way to make a Ring Buffer for single consumer / single consumer that will over-write the oldest data int the buffer. I've read a lot of lock-free algorithms that work when you "return false" if the buffer is full--ie, don't add; but I can't find even pseudo-code that talks about how to do it when you need to overwrite the oldest data.
I am using GCC 4.1.2 (restriction at work, i can't upgrade the version...) and I have the Boost libraries, and in the past I made my own Atomic< T > variable type that follows pretty closely to the upcomming specification (its not perfect, but it is thread-safe and does what i need).
When I thought about it, I figured using these atomics should really take care of the problem. some rough psuedo-code as to what i was thinking:
template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;
unsigned int getNextIndex(unsigned int i)
{
return (i + 1 ) % size;
}
public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
if(writeIndex == getNextIndex(readIndex)) //1
{
readIndex = getNextIndex(readIndex); //2
}
buf[writeIndex] = t;
writeIndex = getNextIndex(writeIndex); //3
}
bool consume(T& t)
{
if(readIndex == writeIndex) //4
return false;
t = buf[readIndex];
readIndex = getNexIndex(readIndex); //5
return true;
}
};
As far as I can tell, there is no deadlock situations here, so we're safe from that (If my implementation above is wrong even on its pseudo-code leve, constructive criticism is always appreciated). However,the BIG race con开发者_运维百科dition I can find is:
lets assume the buffer is full. that is, writeIndex +1 = readIndex; (1) occurs, just as consume is being called. and is true (4) is false, so we move to read from the buffer (5) occurs, and the readIndex is advanced one (so there is, in fact, space in the buffer (2) occurs, advancing readIndex AGAIN, thus LOSING the value.
Basically, its a classic problem of the writter must modify the reader, causing a race condition. Without actually blocking the entire list everytime I access it, I can't think of a way to prevent this from happening. What am I missing??
- Start with a single producer/multiple consumer queue with appropriate progress guarantees.
- If the queue is full and the push would fail, pop one value. Then there will be space to push the new value.
What am I missing??
Lots:
- say you consume a t while it's being overwritten by the producer - how're you detecting/handling that?
- many options - e.g.
do {
copy the value out; check copy has integrity using modification sequence num etc.} while (
corrupt)
- many options - e.g.
- using atomic numbers isn't enough - you also need to use CAS-style loops to affect the index increments (though I do assume you know that, given you say you've read extensively on this already)
- memory barriers
But, let's write that off as being below your pseudo-code level, and consider your explicit question:
- point (5) will actually require a CAS operation. If the readIndex was correctly sampled/copied atop
consume()
- before the (possibly corrupt)t
was copied - then the CAS instruction will fail if it's already been incremented by the producer. Instead of the usual resample and retry CAS, just continue.
Here is a code of circular buffer on atomic variables I have recently created. I have modified it to "override" data instead of returning false. Disclaimer - it is not production grade tested yet.
template<int capacity, int gap, typename T> class nonblockigcircular {
/*
* capacity - size of cicular buffer
* gap - minimum safety distance between head and tail to start insertion operation
* generally gap should exceed number of threads attempting insertion concurrently
* capacity should be gap plus desired buffer size
* T - is a data type for buffer to keep
*/
volatile T buf[capacity]; // buffer
std::atomic<int> t, h, ph, wh;
/* t to h data available for reading
* h to ph - zone where data is likely written but h is not updated yet
* to make sure data is written check if ph==wh
* ph to wh - zone where data changes in progress
*/
bool pop(T &pwk) {
int td, tnd;
do {
int hd=h.load()%capacity;
td=t.load()%capacity;
if(hd==td) return false;
tnd=(td+1)%capacity;
} while(!t.compare_exchange_weak(td, tnd));
pwk=buf[td];
return true;
}
const int count() {
return ( h.load()+capacity-t.load() ) % capacity;
}
bool push(const T &pwk) {
const int tt=t.load();
int hd=h.load();
if( capacity - (hd+capacity-tt) % capacity < gap) {
// Buffer is too full to insert
// return false;
// or delete last record as below
int nt=t.fetch_add(1);
if(nt==capacity-1) t.fetch_sub(capacity);
}
int nwh=wh.fetch_add(1);
if(nwh==capacity-1) wh.fetch_sub(capacity);
buf[nwh%capacity]=pwk;
int nph=ph.fetch_add(1);
if(nph==capacity-1) ph.fetch_sub(capacity);
if(nwh==nph) {
int ohd=hd;
while(! h.compare_exchange_weak(hd, nwh) ) {
hd=h.load();
if( (ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity ) break;
}
}
return true;
}
};
精彩评论