开发者

Properly creating a Task to poll and dispatch child tasks with cancellation for each

开发者 https://www.devze.com 2023-03-30 16:52 出处:网络
I\'m creating a dispatcher class - which is itself a long running task which can be cancelled at anytime by the user.This Task will poll the database to see if there is any work that needs to be done,

I'm creating a dispatcher class - which is itself a long running task which can be cancelled at anytime by the user. This Task will poll the database to see if there is any work that needs to be done, and run up to X [5] # of child tasks.

As far as I can tell - it's working great, but I have a few questions/concerns about the code. More or less -- since I couldn't find another example of this -- am I doing it right ? Are there things that I could improve ?

  1. I'm using a ConcurrentDictionary to keep track of the child tasks that are running. This dictionary stores the RequestKey that's being processed, and the CancellationTokenSource for that task.

Q: Is this the best way to do this ? In the StartDownloadProcess (this is the child task) I'm creating the CancellationTokenSource and adding it to the dictionary, then starting the task. I've added a Continuation to it which then removes the item from the Dictionary when processing is completed so that it doesn't get called in the Cancel method.

  1. In the child task I'm passing the Cancellation Token to the method that actually does the work. That process will then check to see if it needs to abort by checking that token periodically. Is this correct ?

  2. In the Cancel method - I'm creating a copy of the Keys in the Dictionary, iterating over it and trying to access and remove the item from the dictionary and issuing a Cancel request.

Q: Is this the best way to do this ? Do I need to wait to see if the task actually cancelled ? Can I ?

Q: Should I be disposing of the CTS ?

  1. I'm doing a Thread.Sleep in the main task.. good/bad ? Should I use SpinWait instead ? Is there another method/better way to have the primary poller go to sleep and re-run at particular intervals ?

Note: In the StartDownloadProcess I'm using a while(true) to loop until the task completes, or is cancelled to iterate until j > requestKey. In the real code there would be no while loop. It would simply start the new task and run the actual download process.

--

/// <summary>
/// Primary dispatcher token source
/// </summary>
CancellationTokenSource primaryTokenSource;
/// <summary>
/// A collection of Worker Tokens which can be used to cancel worker tasks and keep track of how many
/// there are.
/// </summary>
ConcurrentDictionary<int, CancellationTokenSource> workerTokens = new ConcurrentDictionary<int, CancellationTokenSource>();

/// <summary>
/// Runs this instance.
/// </summary>
public void Run() {
  //  Only one dispatcher can be running
  if (IsRunning)
    return;

  //  Create a new token source
  primaryTokenSource = new CancellationTokenSource();
  //  Create the cancellation token to pass into the Task
  CancellationToken token = primaryTokenSource.Token;

  //  Set flag on
  IsRunning = true;

  //  Fire off the dispatcher
  Task.Factory.StartNew(
    () => {
      //  Loop forever
      while (true) {
        //  If there are more than 5 threads running, don't add a new one
        if (workerTokens.Count < 5) {
          //  Check to see if we've been cancelled
          if (token.IsCancellationRequested)
            return;

          //  Check to see if there are pending requests
          int? requestKey = null;

          //  Query database (removed)
          requestKey = new Random().Next(1550);

          //  If we got a request, start processing it
          if (requestKey != null) {
            //  Check to see if we've been 开发者_运维技巧cancelled before running the child task
            if (token.IsCancellationRequested)
              return;

            //  Start the child downloader task
            StartDownloadProcess(requestKey.Value);
          }
        } else {
          //  Do nothing, we've exceeded our max tasks
          Console.WriteLine("MAX TASKS RUNNING, NOT STARTING NEW");
        }

        //  Sleep for the alloted time
        Thread.Sleep(Properties.Settings.Default.PollingInterval);
    }
  }, token)
  //  Turn running flag off
  .ContinueWith((t) => IsRunning = false)
  //  Notify that we've finished
  .ContinueWith(OnDispatcherStopped);
}

/// <summary>
/// Starts the download process.
/// </summary>
/// <param name="requestKey">The request key.</param>
private void StartDownloadProcess(int requestKey) {
  CancellationTokenSource workerTokenSource = new CancellationTokenSource();
  CancellationToken token = workerTokenSource.Token;

  //  Add the token source to the queue
  workerTokens.GetOrAdd(requestKey, workerTokenSource);

  //  Start the child downloader task
  Task.Factory.StartNew(
    () => {
      int j = 0;
      while (true) {
        if (token.IsCancellationRequested) {
          Console.WriteLine("Sub-Task Cancelled {0}", requestKey);
          return;
        }

        //  Create a new downloader, pass it the RequestKey and token
        //var downloader = new Downloader(requestKey, token);
        //downloader.Run();

        //  Simulate work
        Thread.Sleep(250);
        Console.WriteLine("SUB-Task {0} is RUNNING! - #{1}", requestKey, j);

        //  Simulate - automatically end task when j > requestkey
        if (j++ > requestKey) {
          Console.WriteLine("SUB TASK {0} IS ENDING!", requestKey);
          return;
        }
      }
    },
    token
  ).ContinueWith((t) => {
    //  If we ended naturally, the cancellationtoken will need to be removed from the dictionary
    CancellationTokenSource source = null;
    workerTokens.TryRemove(requestKey, out source);
  });
}

/// <summary>
/// Cancels this instance.
/// </summary>
public void Cancel() {
  //  Cancel the primary task first so new new child tasks are created
  if (primaryTokenSource != null)
    primaryTokenSource.Cancel();

  //  Iterate over running cancellation sources and terminate them
  foreach (var item in workerTokens.Keys.ToList()) {
    CancellationTokenSource source = null;
    if (workerTokens.TryRemove(item, out source)) {
      source.Cancel();
    }
  }
}

Additionally, not shown in the example above.. several events are also able to be raised using within the tasks... those events all look like the following:

public event EventHandler DispatcherStarted;
private void OnDispatcherStarted() {
  EventHandler handler = DispatcherStarted;
  if (handler != null) 
    Task.Factory.StartNew(() => handler(this, EventArgs.Empty), CancellationToken.None, TaskCreationOptions.None, taskScheduler).Wait();      
}

In the Run() method - at various points it would call OnDispatcher*(); to raise the events so the caller could subscribe and be notified. Those tasks that the event creates would run on the primary thread.

  • Bonus Question: I was looking at making the dispatcher generic and passing in the "poller" object which checks the database.. and if successful, creates a child task and passes in the parameter(s) that it needs. I ran into some issues like.. how to pass data around, what objects to pass in.. Interfaces/Classes/Func<,,,>/Action<> etc. How could I turn this into a generic dispatcher/poller that runs A which returns parameters (I was thinking a Dictionary) which then creates a child task B which uses those parameters and supports cancellation and event notification ?


I quickly looked throw the code and have few comments:

  • The usage of IsRunning flag is not thread safe, multiple threads can rea it as false, then set it concurrently to true and you will have more than one dispatcher thread!, to avoid that you have to use Interlocked.CompareExchange to set it, and also you need to mark it as voaltile.
  • I'd recommend not to use Sleep, also SpinWait will not be helpful here, you might use a Timer object that pools the database, and adds the requests to a BlockingCollection which the dispatcher clas consumes the requests from.
  • The child task continuation wil always be executed even if the parent task is cancelled, you can avoid that by passing this TaskContinuationOptions.NotOnCanceled
0

精彩评论

暂无评论...
验证码 换一张
取 消