I have two threads, one thread processes a开发者_Python百科 queue and the other thread adds stuff into the queue.
- I want to put the queue processing thread to sleep when its finished processing the queue
- I want to have the 2nd thread tell it to wake up when it has added an item to the queue
However these functions call System.Threading.SynchronizationLockException: Object synchronization method was called from an unsynchronized block of code
on the Monitor.PulseAll(waiting);
call, because I havent synchronized the function with the waiting object. [which I dont want to do, i want to be able to process while adding items to the queue]. How can I achieve this?
Queue<object> items = new Queue<object>();
object waiting = new object();
1st Thread
public void ProcessQueue()
{
while (true)
{
if (items.Count == 0)
Monitor.Wait(waiting);
object real = null;
lock(items) {
object item = items.Dequeue();
real = item;
}
if(real == null)
continue;
.. bla bla bla
}
}
2nd Thread involves
public void AddItem(object o)
{
... bla bla bla
lock(items)
{
items.Enqueue(o);
}
Monitor.PulseAll(waiting);
}
The answer is in the error message you posted: "Object synchronization method was called from an unsynchronized block of code on the Monitor.PulseAll(waiting);"
You have to call Monitor.PulseAll(waiting) from inside the lock(waiting) block.
Also... you have to call Monitor.Wait from within a lock block as well.
If you have access to .NET 4.0, what you want to do can be achieved by BlockingCollection<T>.
If you want to do it yourself by means of the Monitor
class and signaling with Pulse()
, you are actually on the right track.
You get the exception because to call Wait()
, Pulse()
and PulseAll()
, you have to own the lock
on the specified object. You happen to miss this on waiting
.
A sample basic thread-safe queue that can be used:
- with
foreach
on the consumer, - with
while
or your favorite conditional construct on the producer side, - handles multiple producers/consumers and
- uses
lock()
,Monitor.Pulse()
,Monitor.PulseAll()
andMonitor.Wait()
:
.
public class SignaledQueue<T>
{
Queue<T> queue = new Queue<T>();
volatile bool shutDown = false;
public bool Enqueue(T item)
{
if (!shutDown)
{
lock (queue)
{
queue.Enqueue(item);
//Pulse only if there can be waiters.
if (queue.Count == 1)
{
Monitor.PulseAll(queue);
}
}
return true;
}
//Indicate that processing should stop.
return false;
}
public IEnumerable<T> DequeueAll()
{
while (!shutDown)
{
do
{
T item;
lock (queue)
{
//If the queue is empty, wait.
if (queue.Count == 0)
{
if (shutDown) break;
Monitor.Wait(queue);
if (queue.Count == 0) break;
}
item = queue.Dequeue();
}
yield return item;
} while (!shutDown);
}
}
public void SignalShutDown()
{
shutDown = true;
lock (queue)
{
//Signal all waiting consumers with PulseAll().
Monitor.PulseAll(queue);
}
}
}
Sample usage:
class Program
{
static void Main(string[] args)
{
int numProducers = 4, numConsumers = 2;
SignaledQueue<int> queue = new SignaledQueue<int>();
ParameterizedThreadStart produce = delegate(object obj)
{
Random rng = new Random((int)obj);
int num = 0;
while (queue.Enqueue(++num))
{
Thread.Sleep(rng.Next(100));
}
};
ThreadStart consume = delegate
{
foreach (int num in queue.DequeueAll())
{
Console.Write(" {0}", num);
}
};
Random seedRng = new Random();
for (int i = 0; i < numProducers; i++)
{
new Thread(produce).Start(seedRng.Next());
}
for (int i = 0; i < numConsumers; i++)
{
new Thread(consume).Start();
}
Console.ReadKey(true);
queue.SignalShutDown();
}
}
Use Semaphore http://msdn.microsoft.com/library/system.threading.semaphore.aspx it was designed exactly for this
I prefer to use a callback that launches a processing thread that continues until it's caught up, with locks causing simultaneous readers and writers to wait in line:
public delegate void CallbackDelegate();
class Program
{
static void Main(string[] args)
{
Queue<object> items = new Queue<object>();
Processor processor = new Processor(items);
Adder adder = new Adder(items, new CallbackDelegate(processor.CallBack));
Thread addThread = new Thread(new ParameterizedThreadStart(adder.AddItem));
object objectToAdd = new object();
addThread.Start(objectToAdd);
}
}
class Processor
{
Queue<object> items;
public Processor(Queue<object> itemsArg)
{
items = itemsArg;
}
public void ProcessQueue()
{
lock (items)
{
while (items.Count > 0)
{
object real = items.Dequeue();
// process real
}
}
}
public void CallBack()
{
Thread processThread = new Thread(ProcessQueue);
processThread.IsBackground = true;
processThread.Start();
}
}
class Adder
{
Queue<object> items;
CallbackDelegate callback;
public Adder(Queue<object> itemsArg, CallbackDelegate callbackArg)
{
items = itemsArg;
callback = callbackArg;
}
public void AddItem(object o)
{
lock (items) { items.Enqueue(o); }
callback();
}
}
精彩评论