I've followed this tutorial, to create a priority queue and wrapped it with a blocking collection. I've got a DataGrid which I've wired to the underlying priority queue which emits change events. I can add items to the collection from the UI thread w/out a hitch, and it blocks when the buffer is full as it's supposed to.
Now how do I consume the items? Here's what I've got:
public DownloadViewModel()
{
Queue = new ConcurrentPriorityQueue<DownloadItem>(10);
Buffer = new BlockingCollection<KeyValuePair<int, DownloadItem>>(Queue, 10000);
Task.Factory.StartNew(() =>
{
KeyValuePair<int, DownloadItem> item;
while(!Buffer.IsCompleted)
{
if(Buffer.TryTake(out item))
{
// do something with the item
}
Thread.SpinWait(100000);
}
});
}
But as soon as I added that Task.Factory.StartNew
bit, my app suddenly takes 30 seconds before the window appears (before it was instant), and when I do add an item I get the exception
This type of CollectionView does not support changes to its SourceCo开发者_如何学运维llection from a thread different from the Dispatcher thread.
Which I understand, but is it really necessary to take the items using the UI thread? Doesn't that defeat the whole purpose of using this BlockingCollection? I want to create 4 or 8 consumers and have them run in parallel.
How is this supposed to be done?
Wrapping the CollectionChanged
event w/ a dispatcher seems work pretty well...
public bool TryAdd(KeyValuePair<int, T> item)
{
int pos = _queues.Take(item.Key + 1).Sum(q => q.Count);
_queues[item.Key].Enqueue(item.Value);
Interlocked.Increment(ref _count);
Dispatcher.BeginInvoke(
new Action(
() =>
NotifyCollectionChanged(
new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item, pos))
));
return true;
}
Just had to derive my ConcurrentPriorityQueue
from DispatcherObject
. I think this is how it's supposed to be done.
Easier yet, just write the NotifyCollectionChanged
method like this:
private void NotifyCollectionChanged(NotifyCollectionChangedEventArgs e)
{
lock (CollectionChanged)
{
if (CollectionChanged != null)
Dispatcher.BeginInvoke(new Action(() => CollectionChanged(this, e)));
}
}
And then you don't have to litter your other methods with BeginInvoke
.
[After commenting on the question, then]
You don't need to "take the items using the UI thread". However, any updates to the UI as a result of processing the item in the consuming task need to be dispatched to the UI thread. Separate your concerns!
精彩评论