开发者

Joining on the first finished thread?

开发者 https://www.devze.com 2023-03-22 23:53 出处:网络
I\'m writing up a series of graph-searching algorithms in F# and thought it would be nice to take advantage of parallelization. I wanted to execute several threads in parallel and take the result of t

I'm writing up a series of graph-searching algorithms in F# and thought it would be nice to take advantage of parallelization. I wanted to execute several threads in parallel and take the result of the first one to finish. I've got an implementation, but it's not pretty.

Two questions: is there a standard name for this sort of function? Not a Join or a JoinAll, but a JoinFirst? Second, is there a more idiomatic way to do this?

//implementation
let makeAsync (locker:obj) (shared:'a option ref) (f:unit->'a) =
    async {
        let result = f()
        Monitor.Enter locker
        shared := Some result
        Monitor.Pulse locker
        Monitor.Exit locker
    }

let firstFinished test work =
    let result = ref Option.None
    let locker = new obj()
    let cancel = new CancellationTokenSource()    
    work |> List.map (makeAsync locker result) |> List.map (fun a-> As开发者_StackOverflow中文版ync.StartAsTask(a, TaskCreationOptions.None, cancel.Token)) |> ignore
    Monitor.Enter locker
    while (result.Value.IsNone || (not <| test result.Value.Value)) do
        Monitor.Wait locker |> ignore
    Monitor.Exit locker
    cancel.Cancel()
    match result.Value with
    | Some x-> x
    | None -> failwith "Don't pass in an empty list"
//end implentation

//testing
let delayReturn (ms:int) value = 
    fun ()->
        Thread.Sleep ms
        value

let test () =
    let work = [ delayReturn 1000 "First!"; delayReturn 5000 "Second!" ]
    let result = firstFinished (fun _->true) work
    printfn "%s" result


Would it work to pass the CancellationTokenSource and test to each async and have the first that computes a valid result cancel the others?

let makeAsync (cancel:CancellationTokenSource) test f =
  let rec loop() =
    async {
      if cancel.IsCancellationRequested then 
        return None
      else
        let result = f()
        if test result then
          cancel.Cancel()
          return Some result
        else return! loop()
    }
  loop()

let firstFinished test work =
  match work with
  | [] -> invalidArg "work" "Don't pass in an empty list"
  | _ ->
    let cancel = new CancellationTokenSource()    
    work
    |> Seq.map (makeAsync cancel test) 
    |> Seq.toArray
    |> Async.Parallel
    |> Async.RunSynchronously
    |> Array.pick id

This approach makes several improvements: 1) it uses only async (it's not mixed with Task, which is an alternative for doing the same thing--async is more idiomatic in F#); 2) there's no shared state, other than CancellationTokenSource, which was designed for that purpose; 3) the clean function-chaining approach makes it easy to add additional logic/transformations to the pipeline, including trivially enabling/disabling parallelism.


With the Task Parallel Library in .NET 4, this is called WaitAny. For example, the following snippet creates 10 tasks and waits for any of them to complete:

open System.Threading

Array.init 10 (fun _ ->
  Tasks.Task.Factory.StartNew(fun () ->
    Thread.Sleep 1000))
|> Tasks.Task.WaitAny


In case you are ok to use "Reactive extensions (Rx)" in your project, the joinFirst method can be implemented as:

let joinFirst (f : (unit->'a) list) = 
    let c = new CancellationTokenSource()
    let o = f |> List.map (fun i ->
                    let j = fun() -> Async.RunSynchronously (async {return i() },-1,c.Token)
                    Observable.Defer(fun() -> Observable.Start(j))
                    )
            |> Observable.Amb
    let r = o.First()
    c.Cancel()
    r

Example usage:

[20..30] |> List.map (fun i -> fun() -> Thread.Sleep(i*100); printfn "%d" i; i)
|> joinFirst |> printfn "Done %A"
Console.Read() |> ignore

Update:

Using Mailbox processor :

type WorkMessage<'a> = 
      Done of 'a
    | GetFirstDone of AsyncReplyChannel<'a>

let joinFirst (f : (unit->'a) list) = 
    let c = new CancellationTokenSource()
    let m = MailboxProcessor<WorkMessage<'a>>.Start(
              fun mbox -> async { 
                let afterDone a m = 
                    match m with
                    | GetFirstDone rc -> 
                        rc.Reply(a);
                        Some(async {return ()})
                    | _ -> None
                let getDone m = 
                    match m with
                    |Done a -> 
                        c.Cancel()
                        Some (async {
                                do! mbox.Scan(afterDone a)
                                })  
                    |_ -> None
                do! mbox.Scan(getDone)
                return ()
             } )
    f 
    |> List.iter(fun t -> try 
                            Async.RunSynchronously (async {let out = t()
                                                           m.Post(Done out)
                                                           return ()},-1,c.Token)
                          with
                          _ -> ())
    m.PostAndReply(fun rc -> GetFirstDone rc)


Unfortunately, there is no built-in operation for this provided by Async, but I'd still use F# asyncs, because they directly support cancellation. When you start a workflow using Async.Start, you can pass it a cancellation token and the workflow will automatically stop if the token is cancelled.

This means that you have to start workflows explicitly (instead of using Async.Parallel), so the synchronizataion must be written by hand. Here is a simple version of Async.Choice method that does that (at the moment, it doesn't handle exceptions):

open System.Threading

type Microsoft.FSharp.Control.Async with
  /// Takes several asynchronous workflows and returns 
  /// the result of the first workflow that successfuly completes
  static member Choice(workflows) = 
    Async.FromContinuations(fun (cont, _, _) ->
      let cts = new CancellationTokenSource()
      let completed = ref false
      let lockObj = new obj()
      let synchronized f = lock lockObj f

      /// Called when a result is available - the function uses locks
      /// to make sure that it calls the continuation only once
      let completeOnce res =
        let run =
          synchronized(fun () ->
            if completed.Value then false
            else completed := true; true)
        if run then cont res

      /// Workflow that will be started for each argument - run the 
      /// operation, cancel pending workflows and then return result
      let runWorkflow workflow = async {
        let! res = workflow
        cts.Cancel()
        completeOnce res }

      // Start all workflows using cancellation token
      for work in workflows do
        Async.Start(runWorkflow work, cts.Token) )

Once we write this operation (which is a bit complex, but has to be written only once), solving the problem is quite easy. You can write your operations as async workflows and they'll be cancelled automatically when the first one completes:

let delayReturn n s = async {
  do! Async.Sleep(n) 
  printfn "returning %s" s
  return s }

Async.Choice [ delayReturn 1000 "First!"; delayReturn 5000 "Second!" ]
|> Async.RunSynchronously

When you run this, it will print only "returning First!" because the second workflow will be cancelled.

0

精彩评论

暂无评论...
验证码 换一张
取 消