I was working on encorperating threads into my azure code for putting things on a queue. to do this i used http://www.microsoft.com/download/en/details.aspx?id=19222 as a reference.
my code to enqueue multiple messages looks like this:
public void AddMessagesAsync(IEnumerable<IQueueMessage> messages, string queue = null, TimeSpan? timeToLive = null)
{
//check if we need to switch queues
if (!String.IsNullOrEmpty(queue))
{
SetCurrent(queue);
}
//setup list of messages to enqueue
var tasks = new List<Task>();
Parallel.ForEach(messages, current => {
if (timeToLive.HasValue)
{
//create task with TPL
var task = Task.Factory.FromAsync(Current.BeginAddMessage, Current.EndAddMessage, Convert(current), timeToLive.Value, tasks);
//setup continuation to trigger eventhandler
tasks.Add(task.ContinueWith((t) => AddMessageCompleted(t)));
}
else
{
//create task with TPL
var task = Task.Factory.FromAsync(Current.BeginAddMessage, Current.EndAddMessage, Convert(current), tasks);
//setup continuation to trigger eventhandler
tasks.Add(task.ContinueWith((t) => AddMessageCompleted(t)));
}
});
//setup handler to trigger when all messages are enqueued, a we are blocking the thread over there to wait for all the threads to complete
Task.Factory.ContinueWhenAll(tasks.ToArray(), (t) => AddMessagesCompleted(t));
}
private void AddMessagesCompleted(Task[] tasks)
{
try
{
//wait for all tasks to complete
Task.WaitAll(tasks);
}
catch (AggregateException e)
{
//log the exception
var ex = e;
//return ex;
}
if (AddedMessages != null)
{
AddedMessages(tasks, EventArgs.Empty);
}
}
Now my question is ab开发者_StackOverflow中文版out the Task.Wait in the continuation (which is according to the document provided by MS). it seems a bit strange to wait for threads where you already know the have completed right? the only reason i can imagine is to bubble the errors and process those. am i missing something here?
Task.WaitAll()
will throw an AggregateException
when at least one of the Task instances was canceled -or- an exception was thrown during the execution of at least one of the Task instances.
ContinueWhenAll()
will not throw this exception and its simply start your last task when everything finishes canceled or not etc.
First, I've noticed you are using List<T>
with Parallel.ForEach
which is not thread safe, you should replace it with a concurrent collection, eg: ConcurrentQueue<T>
.
Regarding WaitAll
vs ContinueWhenAll
, WaitAll
will throw if any of the tasks is faulted, so the code above is to validate all the tasks have completed successfully, you can do the same if you pass ContinuationOptions
parameter to ContinueWhenAll
like OnlyRanToCompeletion
so the continuation task will be scheduled only if all tasks completed successfully.
精彩评论