开发者

How to work with queue sysmultaneously in different threads

开发者 https://www.devze.com 2023-03-07 22:19 出处:网络
How do I work with queue at c#? I want one thread that will enqueue data to the queue & another thread will dequeue data from to the queue. Those threads 开发者_C百科should run simultaneously.

How do I work with queue at c#? I want one thread that will enqueue data to the queue & another thread will dequeue data from to the queue. Those threads 开发者_C百科should run simultaneously.

Is it possible?


If you need thread safety use ConcurrentQueue<T>.


If you use System.Collections.Queue thread-safety is guaranteed in this way:

var queue = new Queue();
Queue.Synchronized(queue).Enqueue(new WorkItem());
Queue.Synchronized(queue).Enqueue(new WorkItem());
Queue.Synchronized(queue).Clear();

if you wanna use System.Collections.Generic.Queue<T> then create your own wrapper class. I did this allready with System.Collections.Generic.Stack<T>:

using System;
using System.Collections.Generic;

[Serializable]
public class SomeStack
{
    private readonly object stackLock = new object();

    private readonly Stack<WorkItem> stack;

    public ContextStack()
    {
        this.stack = new Stack<WorkItem>();
    }

    public IContext Push(WorkItem context)
    {
        lock (this.stackLock)
        {
            this.stack.Push(context);
        }

        return context;
    }

    public WorkItem Pop()
    {
        lock (this.stackLock)
        {
            return this.stack.Pop();
        }
    }
}


One possible implementation is to use a ring buffer with separate read and write pointers. On each read/write operation you copy the opposite pointer (must be thread safe) into your local context and then perform batched reads or writes.

on each read or write you update the pointer and pulse an event.

If the read or write thread gets to where it has no more work to do you wait on the other threads event before rereading the appropriate pointer.


You can implement a thread-safe queue using atomic operations. I once wrote the following class for a multi-player game. It allows multiple threads to safely write to the queue, and a single other thread to safely read from the queue:

/// <summary>
/// The WaitFreeQueue class implements the Queue abstract data type through a linked list. The WaitFreeQueue
/// allows thread-safe addition and removal of elements using atomic operations. Multiple threads can add
/// elements simultaneously, and another thread can remove elements from the queue at the same time. Only one
/// thread can remove elements from the queue at any given time.
/// </summary>
/// <typeparam name="T">The type parameter</typeparam>
public class WaitFreeQueue<T>
{
    // Private fields
    // ==============
    #region Private fields
    private Node<T> _tail;  // The tail of the queue.
    private Node<T> _head;  // The head of the queue.
    #endregion



    // Public methods
    // ==============
    #region Public methods
    /// <summary>
    /// Removes the first item from the queue. This method returns a value to indicate if an item was
    /// available, and passes the item back through an argument.
    /// This method is not thread-safe in itself (only one thread can safely access this method at any
    /// given time) but it is safe to call this method while other threads are enqueueing items.
    /// 
    /// If no item was available at the time of calling this method, the returned value is initialised
    /// to the default value that matches this instance's type parameter. For reference types, this is
    /// a Null reference.
    /// </summary>
    /// <param name="value">The value.</param>
    /// <returns>A boolean value indicating if an element was available (true) or not.</returns>
    public bool Dequeue(ref T value)
    {
        bool succeeded = false;
        value = default(T);

        // If there is an element on the queue then we get it.
        if (null != _head)
        {
            // Set the head to the next element in the list, and retrieve the old head.
            Node<T> head = System.Threading.Interlocked.Exchange<Node<T>>(ref _head, _head.Next);

            // Sever the element we just pulled off the queue.
            head.Next = null;

            // We have succeeded.
            value = head.Value;
            succeeded = true;
        }

        return succeeded;
    }

    /// <summary>
    /// Adds another item to the end of the queue. This operation is thread-safe, and multiple threads
    /// can enqueue items while a single other thread dequeues items.
    /// </summary>
    /// <param name="value">The value to add.</param>
    public void Enqueue(T value)
    {
        // We create a new node for the specified value, and point it to itself.
        Node<T> newNode = new Node<T>(value);

        // In one atomic operation, set the tail of the list to the new node, and remember the old tail.
        Node<T> previousTail = System.Threading.Interlocked.Exchange<Node<T>>(ref _tail, newNode);

        // Link the previous tail to the new tail.
        if (null != previousTail)
            previousTail.Next = newNode;

        // If this is the first node in the list, we save it as the head of the queue.
        System.Threading.Interlocked.CompareExchange<Node<T>>(ref _head, newNode, null);
    } // Enqueue()
    #endregion



    // Public constructor
    // ==================
    #region Public constructor
    /// <summary>
    /// Constructs a new WaitFreeQueue instance.
    /// </summary>
    public WaitFreeQueue() { }

    /// <summary>
    /// Constructs a new WaitFreeQueue instance based on the specified list of items.
    /// The items will be enqueued. The list can be a Null reference.
    /// </summary>
    /// <param name="items">The items</param>
    public WaitFreeQueue(IEnumerable<T> items)
    {
        if(null!=items)
            foreach(T item in items)
                this.Enqueue(item);
    }
    #endregion



    // Private types
    // =============
    #region Private types
    /// <summary>
    /// The Node class represents a single node in the linked list of a WaitFreeQueue.
    /// It contains the queued-up value and a reference to the next node in the list.
    /// </summary>
    /// <typeparam name="U">The type parameter.</typeparam>
    private class Node<U>
    {
        // Public fields
        // =============
        #region Public fields
        public Node<U> Next;
        public U Value;
        #endregion



        // Public constructors
        // ===================
        #region Public constructors
        /// <summary>
        /// Constructs a new node with the specified value.
        /// </summary>
        /// <param name="value">The value</param>
        public Node(U value)
        {
            this.Value = value;
        }
        #endregion

    } // Node generic class
    #endregion

} // WaitFreeQueue class

If the restriction of having only a single thread de-queueing while multiple threads can en-queue is OK with you then you could use that. It was great for the game because it meant no thread synchronisation was required.


Example simple usage would be

namespace ConsoleApplication1
{
    class Program
    {

        static void Main(string[] args)
        {
            ExampleQueue eq = new ExampleQueue();
            eq.Run();

            // Wait...
            System.Threading.Thread.Sleep(100000);
        }


    }

    class ExampleQueue
    {
        private Queue<int> _myQueue = new Queue<int>();

        public void Run()
        {
            ThreadPool.QueueUserWorkItem(new WaitCallback(PushToQueue), null);
            ThreadPool.QueueUserWorkItem(new WaitCallback(PopFromQueue), null);
        }

        private void PushToQueue(object Dummy)
        {
            for (int i = 0; i <= 1000; i++)
            {
                lock (_myQueue)
                {
                    _myQueue.Enqueue(i);
                }
            }
            System.Console.WriteLine("END PushToQueue");

        }

        private void PopFromQueue(object Dummy)
        {
            int dataElementFromQueue = -1;
            while (dataElementFromQueue < 1000)
            {
                lock (_myQueue)
                {
                    if (_myQueue.Count > 0)
                    {
                        dataElementFromQueue = _myQueue.Dequeue();

                        // Do something with dataElementFromQueue...
                        System.Console.WriteLine("Dequeued " + dataElementFromQueue);
                    }
                }
            }
            System.Console.WriteLine("END PopFromQueue");

        }
    }
}


You might want to use a blocking queue, in which the thread that is popping from the queue will wait until some data is available.

See: Creating a blocking Queue<T> in .NET?

0

精彩评论

暂无评论...
验证码 换一张
取 消