I want two threads to collaborate; a producer and a consumer. the consumer is rather slow, and the producer is very fast and works in bursts.
for example the consumer can process one message per 20 seconds, and the producer can produce 10 messages in one second, but does it about once in a long while so the consumer can catch up.
I want something like:
Stream commonStream;
AutoResetEvent commonLock;
void Producer()
{
while (true)
{
magic.BlockUntilMagicAvalible();
byte[] buffer = magic.Produce();
commonStream.Write(buffer);
开发者_如何学Python commonLock.Set();
}
}
void Consumer()
{
while(true)
{
commonLock.WaitOne();
MagicalObject o = binarySerializer.Deserialize(commonStream);
DoSomething(o);
}
}
If you have .Net 4.0 or higher you can do it this way by using a BlockingCollection
int maxBufferCap = 500;
BlockingCollection<MagicalObject> Collection
= new BlockingCollection<MagicalObject>(maxBufferCap);
void Producer()
{
while (magic.HasMoreMagic)
{
this.Collection.Add(magic.ProduceMagic());
}
this.Collection.CompleteAdding();
}
void Consumer()
{
foreach (MagicalObject magicalObject in this.Collection.GetConsumingEnumerable())
{
DoSomthing(magicalObject);
}
}
The foreach
line will sleep if there is no data in the buffer, it will also automatically wake it self up when something is added to the collection.
The reason I set the max buffer is if your producer is much faster than the consumer you may end up consuming a lot of memory as more and more objects get put in to the collection. By setting up a max buffer size as you create the blocking collection when the buffer size is reached the Add
call on the producer will block until a item has been removed from the collection by the consumer.
Another bonus of the BlockingCollection
class is it can have as many producers and consumers as you want, it does not need to be a 1:1 ratio. If DoSomthing
supports it you could have a foreach
loop per core of the computer (or even use Parallel.ForEach
and use the consuming enumerable as the data source)
void ConsumersInParalell()
{
//This assumes the method signature of DoSomthing is one of the following:
// Action<MagicalObject>
// Action<MagicalObject, ParallelLoopState>
// Action<MagicalObject, ParallelLoopState, long>
Paralell.ForEach(this.Collection.GetConsumingEnumerable(), DoSomthing);
}
I would read the following articles they describe your problem. Basically you're not getting the right isolation for your unit of work.
Link Link
You can get what you want using a queue and timer. The producer adds values to the queue and starts the consumer timer. The consumer timer's elapsed event (which is on a Threadpool thread) stops the timer, and loops through the queue until it's empty, then disappears (no unnecessary polling). The producer can add to the queue while the consumer is still running.
System.Timers.Timer consumerTimer;
Queue<byte[]> queue = new Queue<byte[]>();
void Producer()
{
consumerTimer = new System.Timers.Timer(1000);
consumerTimer.Elapsed += new System.Timers.ElapsedEventHandler(consumerTimer_Elapsed);
while (true)
{
magic.BlockUntilMagicAvailable();
lock (queue)
{
queue.Enqueue(magic.Produce());
if (!consumerTimer.Enabled)
{
consumerTimer.Start();
}
}
}
}
void consumerTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
while (true)
{
consumerTimer.Stop();
lock (queue)
{
if (queue.Count > 0)
{
DoSomething(queue.Dequeue());
}
else
{
break;
}
}
}
}
I use Mutex's. The idea is that both run in different threads. The Consumer thread is started locked by a mutex, where it will sit indefinitely until release by the Producer. It will then process data in parallel leaving the Producer to continue. The Consumer will re-lock when complete.
(Code start the thread, and other quality bits have been omitted for brevity.)
// Pre-create mutex owned by Producer thread, then start Consumer thread.
Mutex mutex = new Mutex(true);
Queue<T> queue = new Queue<T>();
void Producer_AddData(T data)
{
lock (queue) {
queue.Enqueue(GetData());
}
// Release mutex to start thread:
mutex.ReleaseMutex();
mutex.WaitOne();
}
void Consumer()
{
while(true)
{
// Wait indefinitely on mutex
mutex.WaitOne();
mutex.ReleaseMutex();
T data;
lock (queue) {
data = queue.Dequeue();
}
DoSomething(data);
}
}
This slows the Producer by a very few milliseconds whilst is waits for the Consumer to wake and release the mutex. If you can live with that.
精彩评论