开发者

How should i write producer /consumer code in C#?

开发者 https://www.devze.com 2023-01-15 02:20 出处:网络
I have 1 thread streaming data and a 2nd (the threadpool) processing the data. The data processing takes around 100ms so I use to second thread so not to hold up the 1st thread.

I have 1 thread streaming data and a 2nd (the threadpool) processing the data. The data processing takes around 100ms so I use to second thread so not to hold up the 1st thread.

While the 2nd thread is processing the data the 1st thread adds the data to a dictionary cache then when the 2nd thread is finished it processes the cached values.

My questions is this how should be doing producer /consumer code in C#?

public delegate void OnValue(ulong value);

public class Runner
{
    public event OnValue OnValueEvent;
    private readonly IDictionary<string, ulong> _cache = new Dictionary<string, ulong>(StringComparer.InvariantCultureIgnoreCase);
    private readonly AutoResetEvent _cachePublisherWaitHandle = new AutoResetEvent(true);

    public void Start()
    {
        for (ulong i = 0; i < 500; i++)
        {
            DataStreamHandler(i.ToString(), i);
        }
    }

    private void DataStreamHandler(string id, ulong value)
    {
        _cache[id] = value;

        if (_cachePublisherWaitHandle.WaitOne(1))
        {
            IList<ulong> tempValues = new List<ulong>(_cache.Values);
            _cache.Clear();

            _cachePublisherWaitHandle.Reset();

            ThreadPool.UnsafeQueueUserWorkItem(delegate
            {
                try
                {
                    foreach (ulong value1 in tempValues)
                        if (OnValueEvent != null)
                            OnValueEvent(value1);
                }
                finally
                {
                    _cachePublisherWaitHandle.Set();
                }
            }, null);
        }
        else
        {
            Console.WriteLine(string.Format("Buffered value: {0}.", value));
        }
    }
}

class Program
{
    static void Main(string[] args)
    {
        Stopwatch sw = Stopwatch.StartNew();
        Runner r = new Runner();
        r.OnValueEvent += delegate(ulong x)
                              {
                                  Console.WriteLine(string.Format("Pro开发者_如何学Pythoncessed value: {0}.", x));
                                  Thread.Sleep(100);

                                  if(x == 499)
                                  {
                                      sw.Stop();
                                      Console.WriteLine(string.Format("Time: {0}.", sw.ElapsedMilliseconds));
                                  }
                              };
        r.Start();
        Console.WriteLine("Done");
        Console.ReadLine();
    }
}


The best practice for setting up the producer-consumer pattern is to use the BlockingCollection class which is available in .NET 4.0 or as a separate download of the Reactive Extensions framework. The idea is that the producers will enqueue using the Add method and the consumers will dequeue using the Take method which blocks if the queue is empty. Like SwDevMan81 pointed out the Albahari site has a really good writeup on how to make it work correctly if you want to go the manual route.


There is a good article on MSDN about Synchronizing the Producer and Consumer. There is also a good example on the Albahari site.

0

精彩评论

暂无评论...
验证码 换一张
取 消