开发者

Is this lock-free .NET queue thread safe?

开发者 https://www.devze.com 2022-12-08 16:22 出处:网络
My question is, is the class included below for a single-reader single-writer queue class thread-safe? This kind of queue is called lock-free, even if it will block if the queue is filled. The data st

My question is, is the class included below for a single-reader single-writer queue class thread-safe? This kind of queue is called lock-free, even if it will block if the queue is filled. The data structure was inspired by Marc Gravell's implementation of a blocking queue here at StackOverflow.

The point of the structure is to allow a single thread to write data to the buffer, and another thread to read data. All of this needs to happen as quickly as possible.

A similar data structure is described in an article at DDJ by Herb Sutter, except the implementation is in C++. Another difference is that I use a vanilla linked list, I use a linked list of arrays.

Rather than just including a snippet of code I include the whole thing with comment with a permissive open source license (MIT License 1.0) in case anyone finds it useful, and wants to use it (as-is or modified).

This is related to other questions asked on Stack Overflow of how to create a blocking concurrent queues (see Creating a blockinq Queue in .NET and Thread-safe blocking queue implementation in .NET).

Here is the code:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;

namespace CollectionSandbox
{
    /// This is a single reader / singler writer buffered queue implemented
    /// with (almost) no locks. This implementation will block only if filled 
    /// up. The implementation is a linked-list of arrays.
    /// It was inspired by the desire to create a non-blocking version 
    /// of the blocking queue implementation in C# by Marc Gravell
    /// https://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/530228#530开发者_如何学C228
    class SimpleSharedQueue<T> : IStreamBuffer<T>
    {
        /// Used to signal things are no longer full
        ManualResetEvent canWrite = new ManualResetEvent(true);

        /// This is the size of a buffer 
        const int BUFFER_SIZE = 512;

        /// This is the maximum number of nodes. 
        const int MAX_NODE_COUNT = 100;

        /// This marks the location to write new data to.
        Cursor adder;

        /// This marks the location to read new data from.
        Cursor remover;

        /// Indicates that no more data is going to be written to the node.
        public bool completed = false;

        /// A node is an array of data items, a pointer to the next item,
        /// and in index of the number of occupied items 
        class Node
        {
            /// Where the data is stored.
            public T[] data = new T[BUFFER_SIZE];

            /// The number of data items currently stored in the node.
            public Node next;

            /// The number of data items currently stored in the node.
            public int count;

            /// Default constructor, only used for first node.
            public Node()
            {
                count = 0;
            }

            /// Only ever called by the writer to add new Nodes to the scene
            public Node(T x, Node prev)
            {
                data[0] = x;
                count = 1;

                // The previous node has to be safely updated to point to this node.
                // A reader could looking at the point, while we set it, so this should be 
                // atomic.
                Interlocked.Exchange(ref prev.next, this);
            }
        }

        /// This is used to point to a location within a single node, and can perform 
        /// reads or writers. One cursor will only ever read, and another cursor will only
        /// ever write.
        class Cursor
        {
            /// Points to the parent Queue
            public SimpleSharedQueue<T> q;

            /// The current node
            public Node node;

            /// For a writer, this points to the position that the next item will be written to.
            /// For a reader, this points to the position that the next item will be read from.
            public int current = 0;

            /// Creates a new cursor, pointing to the node
            public Cursor(SimpleSharedQueue<T> q, Node node)
            {
                this.q = q;
                this.node = node;
            }

            /// Used to push more data onto the queue
            public void Write(T x)
            {
                Trace.Assert(current == node.count);

                // Check whether we are at the node limit, and are going to need to allocate a new buffer.
                if (current == BUFFER_SIZE)
                {
                    // Check if the queue is full
                    if (q.IsFull())
                    {
                        // Signal the canWrite event to false
                        q.canWrite.Reset();

                        // Wait until the canWrite event is signaled 
                        q.canWrite.WaitOne();
                    }

                    // create a new node
                    node = new Node(x, node);
                    current = 1;
                }
                else
                {
                    // If the implementation is correct then the reader will never try to access this 
                    // array location while we set it. This is because of the invariant that 
                    // if reader and writer are at the same node: 
                    //    reader.current < node.count 
                    // and 
                    //    writer.current = node.count 
                    node.data[current++] = x;

                    // We have to use interlocked, to assure that we incremeent the count 
                    // atomicalluy, because the reader could be reading it.
                    Interlocked.Increment(ref node.count);
                }
            }

            /// Pulls data from the queue, returns false only if 
            /// there 
            public bool Read(ref T x)
            {
                while (true)
                {
                    if (current < node.count)
                    {
                        x = node.data[current++];
                        return true;
                    }
                    else if ((current == BUFFER_SIZE) && (node.next != null))
                    {
                        // Move the current node to the next one.
                        // We know it is safe to do so.
                        // The old node will have no more references to it it 
                        // and will be deleted by the garbage collector.
                        node = node.next;

                        // If there is a writer thread waiting on the Queue,
                        // then release it.
                        // Conceptually there is a "if (q.IsFull)", but we can't place it 
                        // because that would lead to a Race condition.
                        q.canWrite.Set();

                        // point to the first spot                
                        current = 0;

                        // One of the invariants is that every node created after the first,
                        // will have at least one item. So the following call is safe
                        x = node.data[current++];
                        return true;
                    }

                    // If we get here, we have read the most recently added data.
                    // We then check to see if the writer has finished producing data.
                    if (q.completed)
                        return false;

                    // If we get here there is no data waiting, and no flagging of the completed thread.
                    // Wait a millisecond. The system will also context switch. 
                    // This will allow the writing thread some additional resources to pump out 
                    // more data (especially if it iself is multithreaded)
                    Thread.Sleep(1);
                }
            }
        }

        /// Returns the number of nodes currently used.
        private int NodeCount
        {
            get
            {
                int result = 0;
                Node cur = null;
                Interlocked.Exchange<Node>(ref cur, remover.node);

                // Counts all nodes from the remover to the adder
                // Not efficient, but this is not called often. 
                while (cur != null)
                {
                    ++result;
                    Interlocked.Exchange<Node>(ref cur, cur.next);
                }
                return result;
            }
        }

        /// Construct the queue.
        public SimpleSharedQueue()
        {
            Node root = new Node();
            adder = new Cursor(this, root);
            remover = new Cursor(this, root);
        }

        /// Indicate to the reader that no more data is going to be written.
        public void MarkCompleted()
        {
            completed = true;
        }

        /// Read the next piece of data. Returns false if there is no more data. 
        public bool Read(ref T x)
        {
            return remover.Read(ref x);
        }

        /// Writes more data.
        public void Write(T x)
        {
            adder.Write(x);
        }

        /// Tells us if there are too many nodes, and can't add anymore.
        private bool IsFull()
        {
            return NodeCount == MAX_NODE_COUNT;  
        }
    }
}


Microsoft Research CHESS should prove to be a good tool for testing your implementation.


The presence of Sleep() makes a lock-free approach totally useless. The only reason to confront the complexities of a lock-free design is the need for absolute speed and to avoid the cost of Semaphores. The use of Sleep(1) defeats that purpose totally.


Given that I can't find any reference that the Interlocked.Exchange does Read or Write blocks, I would say not. I would also question why you want to go lockless, as seldom gives enough benefits to counter it's complexity.

Microsoft had an excellent presentation at the 2009 GDC on this, and you can get the slides here.


Beware of the double checked - single lock pattern (as in a link quoted above: http://www.yoda.arachsys.com/csharp/singleton.html)

Quoting verbatim from the "Modern C++ Design" by Andrei Alexandrescu

    Very experienced multithreaded programmers know that even the Double-Checked Locking pattern, although correct on paper, is not always correct in practice. In certain symmetric multiprocessor environments (the ones featuring the so-called relaxed memory model), the writes are committed to the main memory in bursts, rather than one by one. The bursts occur in increasing order of addresses, not in chronological order. Due to this rearranging of writes, the memory as seen by one processor at a time might look as if the operations are not performed in the correct order by another processor. Concretely, the assignment to pInstance_ performed by a processor might occur before the Singleton object has been fully initialized! Thus, sadly, the Double-Checked Locking pattern is known to be defective for such systems


I suspect it is not thread safe - imagine the following scenario:

two threads enter cursor.Write. The first gets as far as line node = new Node(x, node); in the true half of the if (current == BUFFER_SIZE) statement (but let's also assume that current == BUFFER_SIZE) so when 1 gets added to current then another thread coming in would follow the other path through the if statement. Now imagine that thread 1 loses its time slice and thread 2 gets it, and proceeds to enter the if statement on the mistaken belief that the condition still held. It should have entered the other path.

I haven't run this code either, so I'm not sure if my assumptions are possible in this code, but if they are (i.e. entering cursor.Write from multiple threads when current == BUFFER_SIZE), then it may well be prone to concurrency errors.


First, I wonder about the assumption in these two lines of sequential code:

                node.data[current++] = x;

                // We have to use interlocked, to assure that we incremeent the count 
                // atomicalluy, because the reader could be reading it.
                Interlocked.Increment(ref node.count);

What is to say that the new value of node.data[] has been committed to this memory location? It is not stored in a volatile memory address and therefore can be cached if I understand it correctly? Doesn't this potentially lead to a 'dirty' read? There may be other places the same is true, but this one stood out at a glance.

Second, multi-threaded code that contains the following:

Thread.Sleep(int);

... is never a good sign. If it's required then the code is destined to fail, if it isn't required it's a waste. I really wish they would remove this API entirely. Realize that is a request to wait at least that amount of time. With the overhead of context switching your almost certainly going to wait longer, a lot longer.

Third, I completely don't understand the use of the Interlock API here. Maybe I'm tired and just missing the point; but I can't find the potential thread conflict on both threads reading & writing to the same variable? It would seem that the only use I could find for interlock exchange would be to modify the contents of node.data[] to fix #1 above.

Lastly it would seem that the implementation is somewhat over-complicated. Am I missing the point of the whole Cursor/Node thing or is it basically doing the same thing as this class? (Note: I haven't tried it and I don't think this is thread safe either, just trying to boil down what I think your doing.)

class ReaderWriterQueue<T>
{
    readonly AutoResetEvent _readComplete;
    readonly T[] _buffer;
    readonly int _maxBuffer;
    int _readerPos, _writerPos;

    public ReaderWriterQueue(int maxBuffer)
    {
        _readComplete = new AutoResetEvent(true);
        _maxBuffer = maxBuffer;
        _buffer = new T[_maxBuffer];
        _readerPos = _writerPos = 0;
    }

    public int Next(int current) { return ++current == _maxBuffer ? 0 : current; }

    public bool Read(ref T item)
    {
        if (_readerPos != _writerPos)
        {
            item = _buffer[_readerPos];
            _readerPos = Next(_readerPos);
            return true;
        }
        else
            return false;
    }

    public void Write(T item)
    {
        int next = Next(_writerPos);

        while (next == _readerPos)
            _readComplete.WaitOne();

        _buffer[next] = item;
        _writerPos = next;
    }
}

So am I totally off-base here and am failing to see the magic in the original class?

I must admit one thing, I despise Threading. I've seen the best developers fail at it. This article gives a great example on how hard it is to get threading right: http://www.yoda.arachsys.com/csharp/singleton.html

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号