I'm writing up a series of graph-searching algorithms in F# and thought it would be nice to take advantage of parallelization. I wanted to execute several threads in parallel and take the result of the first one to finish. I've got an implementation, but it's not pretty.
Two questions: is there a standard name for this sort of function? Not a Join or a JoinAll, but a JoinFirst? Second, is there a more idiomatic way to do this?
//implementation
let makeAsync (locker:obj) (shared:'a option ref) (f:unit->'a) =
async {
let result = f()
Monitor.Enter locker
shared := Some result
Monitor.Pulse locker
Monitor.Exit locker
}
let firstFinished test work =
let result = ref Option.None
let locker = new obj()
let cancel = new CancellationTokenSource()
work |> List.map (makeAsync locker result) |> List.map (fun a-> As开发者_StackOverflow中文版ync.StartAsTask(a, TaskCreationOptions.None, cancel.Token)) |> ignore
Monitor.Enter locker
while (result.Value.IsNone || (not <| test result.Value.Value)) do
Monitor.Wait locker |> ignore
Monitor.Exit locker
cancel.Cancel()
match result.Value with
| Some x-> x
| None -> failwith "Don't pass in an empty list"
//end implentation
//testing
let delayReturn (ms:int) value =
fun ()->
Thread.Sleep ms
value
let test () =
let work = [ delayReturn 1000 "First!"; delayReturn 5000 "Second!" ]
let result = firstFinished (fun _->true) work
printfn "%s" result
Would it work to pass the CancellationTokenSource
and test
to each async and have the first that computes a valid result cancel the others?
let makeAsync (cancel:CancellationTokenSource) test f =
let rec loop() =
async {
if cancel.IsCancellationRequested then
return None
else
let result = f()
if test result then
cancel.Cancel()
return Some result
else return! loop()
}
loop()
let firstFinished test work =
match work with
| [] -> invalidArg "work" "Don't pass in an empty list"
| _ ->
let cancel = new CancellationTokenSource()
work
|> Seq.map (makeAsync cancel test)
|> Seq.toArray
|> Async.Parallel
|> Async.RunSynchronously
|> Array.pick id
This approach makes several improvements: 1) it uses only async
(it's not mixed with Task
, which is an alternative for doing the same thing--async
is more idiomatic in F#); 2) there's no shared state, other than CancellationTokenSource
, which was designed for that purpose; 3) the clean function-chaining approach makes it easy to add additional logic/transformations to the pipeline, including trivially enabling/disabling parallelism.
With the Task Parallel Library in .NET 4, this is called WaitAny
. For example, the following snippet creates 10 tasks and waits for any of them to complete:
open System.Threading
Array.init 10 (fun _ ->
Tasks.Task.Factory.StartNew(fun () ->
Thread.Sleep 1000))
|> Tasks.Task.WaitAny
In case you are ok to use "Reactive extensions (Rx)" in your project, the joinFirst method can be implemented as:
let joinFirst (f : (unit->'a) list) =
let c = new CancellationTokenSource()
let o = f |> List.map (fun i ->
let j = fun() -> Async.RunSynchronously (async {return i() },-1,c.Token)
Observable.Defer(fun() -> Observable.Start(j))
)
|> Observable.Amb
let r = o.First()
c.Cancel()
r
Example usage:
[20..30] |> List.map (fun i -> fun() -> Thread.Sleep(i*100); printfn "%d" i; i)
|> joinFirst |> printfn "Done %A"
Console.Read() |> ignore
Update:
Using Mailbox processor :
type WorkMessage<'a> =
Done of 'a
| GetFirstDone of AsyncReplyChannel<'a>
let joinFirst (f : (unit->'a) list) =
let c = new CancellationTokenSource()
let m = MailboxProcessor<WorkMessage<'a>>.Start(
fun mbox -> async {
let afterDone a m =
match m with
| GetFirstDone rc ->
rc.Reply(a);
Some(async {return ()})
| _ -> None
let getDone m =
match m with
|Done a ->
c.Cancel()
Some (async {
do! mbox.Scan(afterDone a)
})
|_ -> None
do! mbox.Scan(getDone)
return ()
} )
f
|> List.iter(fun t -> try
Async.RunSynchronously (async {let out = t()
m.Post(Done out)
return ()},-1,c.Token)
with
_ -> ())
m.PostAndReply(fun rc -> GetFirstDone rc)
Unfortunately, there is no built-in operation for this provided by Async
, but I'd still use F# asyncs, because they directly support cancellation. When you start a workflow using Async.Start
, you can pass it a cancellation token and the workflow will automatically stop if the token is cancelled.
This means that you have to start workflows explicitly (instead of using Async.Parallel
), so the synchronizataion must be written by hand. Here is a simple version of Async.Choice
method that does that (at the moment, it doesn't handle exceptions):
open System.Threading
type Microsoft.FSharp.Control.Async with
/// Takes several asynchronous workflows and returns
/// the result of the first workflow that successfuly completes
static member Choice(workflows) =
Async.FromContinuations(fun (cont, _, _) ->
let cts = new CancellationTokenSource()
let completed = ref false
let lockObj = new obj()
let synchronized f = lock lockObj f
/// Called when a result is available - the function uses locks
/// to make sure that it calls the continuation only once
let completeOnce res =
let run =
synchronized(fun () ->
if completed.Value then false
else completed := true; true)
if run then cont res
/// Workflow that will be started for each argument - run the
/// operation, cancel pending workflows and then return result
let runWorkflow workflow = async {
let! res = workflow
cts.Cancel()
completeOnce res }
// Start all workflows using cancellation token
for work in workflows do
Async.Start(runWorkflow work, cts.Token) )
Once we write this operation (which is a bit complex, but has to be written only once), solving the problem is quite easy. You can write your operations as async workflows and they'll be cancelled automatically when the first one completes:
let delayReturn n s = async {
do! Async.Sleep(n)
printfn "returning %s" s
return s }
Async.Choice [ delayReturn 1000 "First!"; delayReturn 5000 "Second!" ]
|> Async.RunSynchronously
When you run this, it will print only "returning First!" because the second workflow will be cancelled.
精彩评论