开发者

What is the recommended F# pattern for this situation?

开发者 https://www.devze.com 2023-03-28 17:56 出处:网络
I have a situation which resembles the following: let mutable stopped = false let runAsync() = async { while not stopped do

I have a situation which resembles the following:

let mutable stopped = false

let runAsync() = async {
    while not stopped do
        let! item = fetchItemToProcessAsync
        match item with
        | Some job -> job |> runJobAsync |> Async.Start
        | None -> do! Async.Sleep(1000)
}

let run() = Async.Start runAsync
let stop() =
    stopped <- true

Now when the stop method is called, I have to stop reading further items from the DB and also wait for the ones that are currently started to finish before returning from this function.

What is the best way to accomplish this? 开发者_StackOverflow中文版I was thinking of using a counter, (with interlocked APIs) and return from the stop method when the counter reaches to 0.

If there is an alternative way to accomplish this, I would appreciate the guidance. I have a feeling that I could use agents here, but I am not sure if there is any available way to accomplish this with agent or if I still have to write my custom logic to determine that the jobs have completed executing.


take a look at actor-based patterns and MailboxProcessor

basically you can imagine that as a async-queue. If you use a list of running (started with Async.StartChild or Async.StartAsTask) as the parameter for your loop inside the MailboxProcessor you can gracefully handle shutdowns via wait or a CancellationToken)

Here is a quick sample I put together:


type Commands = 
    | RunJob of Async
    | JobDone of int
    | Quit of AsyncReplyChannel

type JobRunner() =
    let processor =
        MailboxProcessor.Start (fun inbox ->
            let rec loop (nextId, jobs) = async {
                let! cmd = inbox.Receive()
                match cmd with
                | Quit cb ->
                    if not (Map.isEmpty jobs) 
                    then async {
                            do! Async.Sleep 100
                            inbox.Post (Quit cb)}
                        |> Async.Start
                        return! loop (nextId, jobs)
                    else 
                        cb.Reply()
                        return ()
                | JobDone id ->
                    return! loop (nextId, jobs |> Map.remove id)
                | RunJob job ->
                    let runJob i = async {
                        do! job
                        inbox.Post (JobDone i)
                    }
                    let! child = Async.StartChild (runJob nextId)
                    return! loop (nextId+1, jobs |> Map.add nextId child)
            }
            loop (0, Map.empty))
    member jr.PostJob(job) = processor.Post (RunJob job)
    member jr.Quit() = processor.PostAndReply(fun cb -> Quit cb)

let postWaitJob (jobRunner : JobRunner) time =
    let job = async {
        do! Async.Sleep time
        printfn "sleept for %d ms" time }
    jobRunner.PostJob job

let testRun() =
    let jr = new JobRunner()
    printfn "starting jobs..."
    [10..-1..1] |> List.iter (fun i -> postWaitJob jr (i*1000))
    printfn "sending quit"
    jr.Quit()
    printfn "done!"

Hmm ... got some issues with the editor here: it just kills a lot of code when I use the pipe-back operator ... grrr

Short explanation: as you can see I always provide the inner loop with the next free job-id and a Map of Id->AsyncChild jobs. (you can of course implement other/better solutions - the Map is not neccessary in this example but you can extent with a command "Cancell JobNr" or whatever this way) the Job done message is only used internaly to remove jobs from this map Quit just checks if the map is empty - if it is no additional work is needed and the Mailboxprocessor quits (return ()) - if it is not empty a new Async-Child is started that just waits 100ms and then resends the Quit-Message RunJob is rather simple to - it just chains the given job with a post of JobDone into the MessabeboxProcessor and recursivley calls loop with the updated values (nextId is one up, and a new Job is mapped to the old nextId)


Check out this snippet on fssnip.net. This is generic job processor which you can use.

0

精彩评论

暂无评论...
验证码 换一张
取 消