I'm creating a dispatcher class - which is itself a long running task which can be cancelled at anytime by the user. This Task will poll the database to see if there is any work that needs to be done, and run up to X [5] # of child tasks.
As far as I can tell - it's working great, but I have a few questions/concerns about the code. More or less -- since I couldn't find another example of this -- am I doing it right ? Are there things that I could improve ?
- I'm using a ConcurrentDictionary to keep track of the child tasks that are running. This dictionary stores the RequestKey that's being processed, and the CancellationTokenSource for that task.
Q: Is this the best way to do this ? In the StartDownloadProcess (this is the child task) I'm creating the CancellationTokenSource and adding it to the dictionary, then starting the task. I've added a Continuation to it which then removes the item from the Dictionary when processing is completed so that it doesn't get called in the Cancel method.
In the child task I'm passing the Cancellation Token to the method that actually does the work. That process will then check to see if it needs to abort by checking that token periodically. Is this correct ?
In the Cancel method - I'm creating a copy of the Keys in the Dictionary, iterating over it and trying to access and remove the item from the dictionary and issuing a Cancel request.
Q: Is this the best way to do this ? Do I need to wait to see if the task actually cancelled ? Can I ?
Q: Should I be disposing of the CTS ?- I'm doing a Thread.Sleep in the main task.. good/bad ? Should I use SpinWait instead ? Is there another method/better way to have the primary poller go to sleep and re-run at particular intervals ?
Note: In the StartDownloadProcess I'm using a while(true) to loop until the task completes, or is cancelled to iterate until j > requestKey. In the real code there would be no while loop. It would simply start the new task and run the actual download process.
--
/// <summary>
/// Primary dispatcher token source
/// </summary>
CancellationTokenSource primaryTokenSource;
/// <summary>
/// A collection of Worker Tokens which can be used to cancel worker tasks and keep track of how many
/// there are.
/// </summary>
ConcurrentDictionary<int, CancellationTokenSource> workerTokens = new ConcurrentDictionary<int, CancellationTokenSource>();
/// <summary>
/// Runs this instance.
/// </summary>
public void Run() {
// Only one dispatcher can be running
if (IsRunning)
return;
// Create a new token source
primaryTokenSource = new CancellationTokenSource();
// Create the cancellation token to pass into the Task
CancellationToken token = primaryTokenSource.Token;
// Set flag on
IsRunning = true;
// Fire off the dispatcher
Task.Factory.StartNew(
() => {
// Loop forever
while (true) {
// If there are more than 5 threads running, don't add a new one
if (workerTokens.Count < 5) {
// Check to see if we've been cancelled
if (token.IsCancellationRequested)
return;
// Check to see if there are pending requests
int? requestKey = null;
// Query database (removed)
requestKey = new Random().Next(1550);
// If we got a request, start processing it
if (requestKey != null) {
// Check to see if we've been 开发者_运维技巧cancelled before running the child task
if (token.IsCancellationRequested)
return;
// Start the child downloader task
StartDownloadProcess(requestKey.Value);
}
} else {
// Do nothing, we've exceeded our max tasks
Console.WriteLine("MAX TASKS RUNNING, NOT STARTING NEW");
}
// Sleep for the alloted time
Thread.Sleep(Properties.Settings.Default.PollingInterval);
}
}, token)
// Turn running flag off
.ContinueWith((t) => IsRunning = false)
// Notify that we've finished
.ContinueWith(OnDispatcherStopped);
}
/// <summary>
/// Starts the download process.
/// </summary>
/// <param name="requestKey">The request key.</param>
private void StartDownloadProcess(int requestKey) {
CancellationTokenSource workerTokenSource = new CancellationTokenSource();
CancellationToken token = workerTokenSource.Token;
// Add the token source to the queue
workerTokens.GetOrAdd(requestKey, workerTokenSource);
// Start the child downloader task
Task.Factory.StartNew(
() => {
int j = 0;
while (true) {
if (token.IsCancellationRequested) {
Console.WriteLine("Sub-Task Cancelled {0}", requestKey);
return;
}
// Create a new downloader, pass it the RequestKey and token
//var downloader = new Downloader(requestKey, token);
//downloader.Run();
// Simulate work
Thread.Sleep(250);
Console.WriteLine("SUB-Task {0} is RUNNING! - #{1}", requestKey, j);
// Simulate - automatically end task when j > requestkey
if (j++ > requestKey) {
Console.WriteLine("SUB TASK {0} IS ENDING!", requestKey);
return;
}
}
},
token
).ContinueWith((t) => {
// If we ended naturally, the cancellationtoken will need to be removed from the dictionary
CancellationTokenSource source = null;
workerTokens.TryRemove(requestKey, out source);
});
}
/// <summary>
/// Cancels this instance.
/// </summary>
public void Cancel() {
// Cancel the primary task first so new new child tasks are created
if (primaryTokenSource != null)
primaryTokenSource.Cancel();
// Iterate over running cancellation sources and terminate them
foreach (var item in workerTokens.Keys.ToList()) {
CancellationTokenSource source = null;
if (workerTokens.TryRemove(item, out source)) {
source.Cancel();
}
}
}
Additionally, not shown in the example above.. several events are also able to be raised using within the tasks... those events all look like the following:
public event EventHandler DispatcherStarted;
private void OnDispatcherStarted() {
EventHandler handler = DispatcherStarted;
if (handler != null)
Task.Factory.StartNew(() => handler(this, EventArgs.Empty), CancellationToken.None, TaskCreationOptions.None, taskScheduler).Wait();
}
In the Run() method - at various points it would call OnDispatcher*(); to raise the events so the caller could subscribe and be notified. Those tasks that the event creates would run on the primary thread.
- Bonus Question: I was looking at making the dispatcher generic and passing in the "poller" object which checks the database.. and if successful, creates a child task and passes in the parameter(s) that it needs. I ran into some issues like.. how to pass data around, what objects to pass in.. Interfaces/Classes/Func<,,,>/Action<> etc. How could I turn this into a generic dispatcher/poller that runs A which returns parameters (I was thinking a Dictionary) which then creates a child task B which uses those parameters and supports cancellation and event notification ?
I quickly looked throw the code and have few comments:
- The usage of IsRunning flag is not thread safe, multiple threads can rea it as false, then set it concurrently to true and you will have more than one dispatcher thread!, to avoid that you have to use Interlocked.CompareExchange to set it, and also you need to mark it as voaltile.
- I'd recommend not to use Sleep, also SpinWait will not be helpful here, you might use a Timer object that pools the database, and adds the requests to a BlockingCollection which the dispatcher clas consumes the requests from.
- The child task continuation wil always be executed even if the parent task is cancelled, you can avoid that by passing this TaskContinuationOptions.NotOnCanceled
精彩评论