EDIT: It kind of occurred to me too late (?) that all the code I posted in my first update to this question was way too much for most readers. I've actually gone ahead and written a blog post about this topic for anyone who cares to read it.
In the meantime, I've left the original question in place, to give a brief glimpse at the problem I'd like to solve.
I'll also just note that the code I have posted (on my blog) has, thus far, stood up pretty well to testing. But I'm still interested in any and all feedback people are willing to give me on how clean/robust/performant* it is.
*I love how that word doesn't really mean what we think, but we developers use it all the time anyway.
Original Question
Sorry for the vague question title -- not sure how to encapsulate what I'm asking below succinctly. (If someone with editing privileges can think of a more descriptive title, feel free to change it.)
The behavior I need is this. I am envisioning a worker class that accepts a single delegate task in its constructor (for simplicity, I would make it immutable -- no more tasks can be added after instantiation). I'll call this task T
. The class should have a simple method, something like GetToWork
, that will exhibit this behavior:
- If the worker is not currently running
T
, then it will start doing so right now. - If the worker is currently running
T
, then once it is finished, it will startT
again immediately. GetToWork
can be called any number of times while the worker is runningT
; the simple rule is that, during any execution ofT
, ifGetToWork
was called at least once,T
will run again upon completion (and then ifGetToWork
is called whileT
is running that time, it will repeat itself again, etc.).
Now, this is pretty straightforward with a boolean switch. But this class needs to be thread-safe, by which I mean, steps 1 and 2 above need to comprise atomic operations (at least I think they do).
There is an added layer of complexity. I have need of a "worker chain" class that will consist of many of these workers linked together. As soon as the first worker completes, it essentially calls GetToWork
on the worker after it; meanwhile, if its own GetToWork
has been called, it restarts itself as well. Logically calling GetToWork
on the chain is essentially the same as calling GetToWork
on the first worker in the chain (I would fully intend that the chain's workers not be publicly accessible).
One way to imagine how this hypothetical "worker chain" would behave is by comparing it to a team in a relay race. Suppose there are four runners, W1
through W4
, and let the chain be called C
. If I call C.StartWork()
, what should happen is this:
- If
W1
is at his starting point (i.e., doing nothing), he will start running towardsW2
. - If
W1
is already running towardsW2
(i.e., executing his task), then once he reachesW2
, he will signal toW2
to get started, immediately return to his starting point and, sinceStartWork
has been called, start running towardsW2
again. - When
W1
reachesW2
's starting point, he'll immediately return to his own starting point.- If
W2
is just sitting around, he'll start running immediately towardsW3
. - If
W2
is already off running towardsW3
, thenW2
will simply go again once he's reachedW3
and returned to his starting point.
- If
The above is probably a little convoluted and written out poorly. But hopefully you get the basic idea. Obviously, these workers will be running on their own threads.
Also, I guess it's possible this functionality already exists somewh开发者_StackOverflow社区ere? If that's the case, definitely let me know!
Use semaphores. Each worker is a thread with the following code (pseudocode):
WHILE(TRUE)
WAIT_FOR_SEMAPHORE(WORKER_ID) //The semaphore for the current worker
RESET_SEMAPHORE(WORKER_ID)
/* DO WORK */
POST_SEMAPHORE(NEXT_WORKER_ID) //The semaphore for the next worker
END
A non-zero semaphore means that someone signaled the current thread to do the work. After gets a non zero semaphore in its entry line, it resets the semaphore (mark as no one signaled), do the work (meanwhile the semaphore can be posted again) and post the semaphore for the next worker. The story repeats in the next worker(s).
A naive implementation that you may get some mileage from.
Note:
It is my understanding that scalar types, r.e. the bool flags controlling execution, have atomic assignment making them as thread safe as you would need/want in this scenario.
There are much more complex possibilities involving semaphores and other strategies, but if simple works....
using System;
using System.Threading;
namespace FlaggedWorkerChain
{
internal class Program
{
private static void Main(string[] args)
{
FlaggedChainedWorker innerWorker = new FlaggedChainedWorker("inner", () => Thread.Sleep(1000), null);
FlaggedChainedWorker outerWorker = new FlaggedChainedWorker("outer", () => Thread.Sleep(500), innerWorker);
Thread t = new Thread(outerWorker.GetToWork);
t.Start();
// flag outer to do work again
outerWorker.GetToWork();
Console.WriteLine("press the any key");
Console.ReadKey();
}
}
public sealed class FlaggedChainedWorker
{
private readonly string _id;
private readonly FlaggedChainedWorker _innerWorker;
private readonly Action _work;
private bool _busy;
private bool _flagged;
public FlaggedChainedWorker(string id, Action work, FlaggedChainedWorker innerWorker)
{
_id = id;
_work = work;
_innerWorker = innerWorker;
}
public void GetToWork()
{
if (_busy)
{
_flagged = true;
return;
}
do
{
_flagged = false;
_busy = true;
Console.WriteLine(String.Format("{0} begin", _id));
_work.Invoke();
if (_innerWorker != null)
{
_innerWorker.GetToWork();
}
Console.WriteLine(String.Format("{0} end", _id));
_busy = false;
} while (_flagged);
}
}
}
Seems to me that you're overcomplicating this. I've written these "pipeline" classes before; all you need is a queue of workers each with a wait handle that gets signaled after the action is complete.
public class Pipeline : IDisposable
{
private readonly IEnumerable<Stage> stages;
public Pipeline(IEnumerable<Action> actions)
{
if (actions == null)
throw new ArgumentNullException("actions");
stages = actions.Select(a => new Stage(a)).ToList();
}
public Pipeline(params Action[] actions)
: this(actions as IEnumerable<Action>)
{
}
public void Dispose()
{
foreach (Stage stage in stages)
stage.Dispose();
}
public void Start()
{
foreach (Stage currentStage in stages)
currentStage.Execute();
}
class Stage : IDisposable
{
private readonly Action action;
private readonly EventWaitHandle readyEvent;
public Stage(Action action)
{
this.action = action;
this.readyEvent = new AutoResetEvent(true);
}
public void Dispose()
{
readyEvent.Close();
}
public void Execute()
{
readyEvent.WaitOne();
action();
readyEvent.Set();
}
}
}
And here's a test program, which you can use to verify that actions always get executed in the correct order and only one of the same action can ever execute at once:
class Program
{
static void Main(string[] args)
{
Action firstAction = GetTestAction(1);
Action secondAction = GetTestAction(2);
Action thirdAction = GetTestAction(3);
Pipeline pipeline = new Pipeline(firstAction, secondAction, thirdAction);
for (int i = 0; i < 10; i++)
{
ThreadPool.QueueUserWorkItem(s => pipeline.Start());
}
}
static Action GetTestAction(int index)
{
return () =>
{
Console.WriteLine("Action started: {0}", index);
Thread.Sleep(100);
Console.WriteLine("Action finished: {0}", index);
};
}
}
Short, simple, completely thread-safe.
If for some reason you need to start working at a specific step in the chain instead, then you can just add an overload for Start
:
public void Start(int index)
{
foreach (Stage currentStage in stages.Skip(index + 1))
currentStage.Execute();
}
Edit
Based on comments, I think a few minor changes to the inner Stage
class should be enough to get the kind of behaviour you want. We just need to add a "queued" event in addition to the "ready" event.
class Stage : IDisposable
{
private readonly Action action;
private readonly EventWaitHandle readyEvent;
private readonly EventWaitHandle queuedEvent;
public Stage(Action action)
{
this.action = action;
this.readyEvent = new AutoResetEvent(true);
this.queuedEvent = new AutoResetEvent(true);
}
public void Dispose()
{
readyEvent.Close();
}
private bool CanExecute()
{
if (readyEvent.WaitOne(0, true))
return true;
if (!queuedEvent.WaitOne(0, true))
return false;
readyEvent.WaitOne();
queuedEvent.Set();
return true;
}
public bool Execute()
{
if (!CanExecute())
return false;
action();
readyEvent.Set();
return true;
}
}
Also change the pipeline's Start
method to break if a stage can't execute (i.e. is already queued):
public void Start(int index)
{
foreach (Stage currentStage in stages.Skip(index + 1))
if (!currentStage.Execute())
break;
}
The concept here is pretty simple, again:
- A stage first tries to immediately acquire the ready state. If it succeeds, then it starts running.
- If it fails to acquire the ready state (i.e. the task is already running), then it tries to acquire the queued state.
- If it gets the queued state, then it waits for the ready state to become available and then releases the queued state.
- If it can't get the queued state either, then it gives up.
I've read over your question and comments again and I'm pretty sure this is exactly what you're trying to do, and gives the best trade-off between safety, throughput, and throttling.
Because the ThreadPool
can sometimes take a while to respond, you should up the delay in the test program to 1000
instead of 100
if you want to really see the "skips" happening.
精彩评论