开发者

C# producer/consumer

开发者 https://www.devze.com 2022-12-10 09:36 出处:网络
i\'ve recently come across a producer/consumer pattern c# implementation. it\'s very simple and (for me at least) very elegant.

i've recently come across a producer/consumer pattern c# implementation. it's very simple and (for me at least) very elegant.

it seems to have been devised around 2006, so i was wondering if this implementation is

- safe

- still applicable

Code is below (original code was referenced at http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)

using System;  
using System.Collections;  
using System.Threading;

public class Test
{  
    static ProducerConsumer queue;

    static void Main()
    {
        queue = new ProducerConsumer();
        new Thread(new ThreadStart(ConsumerJob)).Start();

        Random rng = new Random(0);
        for (int i=0; i < 10; i++)
        {
            Console.WriteLine ("Producing {0}", i);
            queue.Produce(i);
            Thread.Sleep(rng.Next(1000));
        }
    }

    static void ConsumerJob()
    {
        // Make sure we get a different random seed from the
        // first thread
        Random rng = new Random(1);
        // We happen to know we've only got 10 
        // items to receive
        for (int i=0; i < 10; i++)
        {
            object o = queue.Consume();
            Console.WriteLine ("\t\t\t\tConsuming {0}", o);
            Thread.Sleep(rng.Next(1000));开发者_StackOverflow
        }
    }
}

public class ProducerConsumer
{
    readonly object listLock = new object();
    Queue queue = new Queue();

    public void Produce(object o)
    {
        lock (listLock)
        {
            queue.Enqueue(o);

            // We always need to pulse, even if the queue wasn't
            // empty before. Otherwise, if we add several items
            // in quick succession, we may only pulse once, waking
            // a single thread up, even if there are multiple threads
            // waiting for items.            
            Monitor.Pulse(listLock);
        }
    }

    public object Consume()
    {
        lock (listLock)
        {
            // If the queue is empty, wait for an item to be added
            // Note that this is a while loop, as we may be pulsed
            // but not wake up before another thread has come in and
            // consumed the newly added object. In that case, we'll
            // have to wait for another pulse.
            while (queue.Count==0)
            {
                // This releases listLock, only reacquiring it
                // after being woken up by a call to Pulse
                Monitor.Wait(listLock);
            }
            return queue.Dequeue();
        }
    }
}


The code is older than that - I wrote it some time before .NET 2.0 came out. The concept of a producer/consumer queue is way older than that though :)

Yes, that code is safe as far as I'm aware - but it has some deficiencies:

  • It's non-generic. A modern version would certainly be generic.
  • It has no way of stopping the queue. One simple way of stopping the queue (so that all the consumer threads retire) is to have a "stop work" token which can be put into the queue. You then add as many tokens as you have threads. Alternatively, you have a separate flag to indicate that you want to stop. (This allows the other threads to stop before finishing all the current work in the queue.)
  • If the jobs are very small, consuming a single job at a time may not be the most efficient thing to do.

The ideas behind the code are more important than the code itself, to be honest.


You could do something like the following code snippet. It's generic and has a method for enqueue-ing nulls (or whatever flag you'd like to use) to tell the worker threads to exit.

The code is taken from here: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication1
{

    public class TaskQueue<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                Console.Write(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}


Back in the day I learned how Monitor.Wait/Pulse works (and a lot about threads in general) from the above piece of code and the article series it is from. So as Jon says, it has a lot of value to it and is indeed safe and applicable.

However, as of .NET 4, there is a producer-consumer queue implementation in the framework. I only just found it myself but up to this point it does everything I need.


These days a more modern option is available using the namespace System.Threading.Tasks.Dataflow. It's async/await friendly and much more versatile.

More info here How to: Implement a producer-consumer dataflow pattern

It's included starting from .Net Core, for older .Nets you may need to install a package with the same name as the namespace.

I know the question is old, but it's the first match in Google for my request, so I decided to update the topic.


A modern and simple way to implement the producer/consumer pattern in C# is to use System.Threading.Channels. It's asynchronous and uses ValueTask's to decrease memory allocations. Here is an example:

public class ProducerConsumer<T>
{
    protected readonly Channel<T> JobChannel = Channel.CreateUnbounded<T>();

    public IAsyncEnumerable<T> GetAllAsync()
    {
        return JobChannel.Reader.ReadAllAsync();
    }

    public async ValueTask AddAsync(T job)
    {
        await JobChannel.Writer.WriteAsync(job);
    }

    public async ValueTask AddAsync(IEnumerable<T> jobs)
    {
        foreach (var job in jobs)
        {
            await JobChannel.Writer.WriteAsync(job);
        }
    }
}


Warning: If you read the comments, you'll understand my answer is wrong :)

There's a possible deadlock in your code.

Imagine the following case, for clarity, I used a single-thread approach but should be easy to convert to multi-thread with sleep:

// We create some actions...
object locker = new object();

Action action1 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action1");
    }
};

Action action2 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action2");
    }
};

// ... (stuff happens, etc.)

// Imagine both actions were running
// and there's 0 items in the queue

// And now the producer kicks in...
lock (locker)
{
    // This would add a job to the queue

    Console.WriteLine("Pulse now!");
    System.Threading.Monitor.Pulse(locker);
}

// ... (more stuff)
// and the actions finish now!

Console.WriteLine("Consume action!");
action1(); // Oops... they're locked...
action2();

Please do let me know if this doesn't make any sense.

If this is confirmed, then the answer to your question is, "no, it isn't safe" ;) I hope this helps.


public class ProducerConsumerProblem
    {
        private int n;
        object obj = new object();
        public ProducerConsumerProblem(int n)
        {
            this.n = n;
        }

        public void Producer()
        {

            for (int i = 0; i < n; i++)
            {
                lock (obj)
                {
                    Console.Write("Producer =>");
                    System.Threading.Monitor.Pulse(obj);
                    System.Threading.Thread.Sleep(1);
                    System.Threading.Monitor.Wait(obj);
                }
            }
        }

        public void Consumer()
        {
            lock (obj)
            {
                for (int i = 0; i < n; i++)
                {
                    System.Threading.Monitor.Wait(obj, 10);
                    Console.Write("<= Consumer");
                    System.Threading.Monitor.Pulse(obj);
                    Console.WriteLine();
                }
            }
        }
    }

    public class Program
    {
        static void Main(string[] args)
        {
            ProducerConsumerProblem f = new ProducerConsumerProblem(10);
            System.Threading.Thread t1 = new System.Threading.Thread(() => f.Producer());
            System.Threading.Thread t2 = new System.Threading.Thread(() => f.Consumer());
            t1.IsBackground = true;
            t2.IsBackground = true;
            t1.Start();
            t2.Start();
            Console.ReadLine();
        }
    }

output

Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
0

精彩评论

暂无评论...
验证码 换一张
取 消