开发者

How to implement asynchronous interdependent cancellable operations with actors?

开发者 https://www.devze.com 2023-03-31 18:47 出处:网络
开发者_如何学CI am quite new to the Actor model, that\'s why I think there are already established patterns addressing my common-looking scenario with such beautiful composable abstractions as actors
开发者_如何学C

I am quite new to the Actor model, that's why I think there are already established patterns addressing my common-looking scenario with such beautiful composable abstractions as actors and futures.

I have asynchronous operations with the following requirements:

  • They use a legacy system by sending out a low-level request and then monitoring the state of the entity with polling. So the result of an actual operation is only available in a deferred way, requesters have to be notified when the observed state reaches the desired state.
  • These operations can be issued only after some other operations are finished, for which they should wait in parallel.
  • The operations can be cancelled. Of course, already issued low-level requests cannot be undone; cancellation means don't issue the actual operation after the operations which we depend on finished, and of course this has to be propagated recursively (if we wait for a dependency, and it has multiple pending operations, don't issue them).

I'm thinking in Futures: the first requirement can be solved with e.g. Akka's map/flatMap, the second with the traverse combinator without maintaining dependencies/dependents procedurally. But I can't think of a solution for cancellation; futures cannot be cancelled, and if composed, their components are not reachable. How to encapsulate "cancel the current operation" in a functional way? Does any of the Actor frameworks for Scala support this?


Use Listeners: https://github.com/jboner/akka/blob/release-1.2/akka-actor/src/main/scala/akka/routing/Listeners.scala

Make an Actor that uses Listeners to propagate the state of the polling to any and all listeners. Then you can use message-passing looping to reinitiate the polling.

class MyActor extends Actor with Listeners {

  override def preStart {
    self ! 'poll //Start looping on start
  }

  def receive = listenerManagement orElse {
    case 'poll => val result = pollYourExternalDude()
    gossip(result)
    self ! 'poll //Loop
  }
}

You can then stop the actor using either stop or sending the PoisonPill.

Does that help?


Guava's ListenableFutures support cancellation to a level when bound together (but not when combined from a collection).

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号