开发者

How to Cancel an Akka actor?

开发者 https://www.devze.com 2023-04-04 17:24 出处:网络
I have an akka actor(worker) that receives a request and replies to it. The request processing can take 3-60 minutes. Caller(also an actor) is currently using !!! and waiting on future.get, however th

I have an akka actor(worker) that receives a request and replies to it. The request processing can take 3-60 minutes. Caller(also an actor) is currently using !!! and waiting on future.get, however the design of Caller actor can be changed if required. Also, I'm currently using EventDriven dispatcher.

How do i Cancel(user initiated) the request processing so that the worker actor is freed up and returns to the ready state to receive new requests? I was hoping for a method similar to java.util.concurrent.Future's cancel method but couldn't find in Akka 1.1.3

Edit:

We tried to get the behavior we are looking for with completeWithException:

object Cancel {
  def main开发者_C百科(args: Array[String]) {
    val actor = Actor.actorOf[CancelActor].start
    EventHandler.info(this, "Getting future")
    val future = (actor ? "request").onComplete(x => EventHandler.info(this, "Completed!! " + x.get))
    Thread.sleep(500L)
    EventHandler.info(this, "Cancelling")
    future.completeWithException(new Exception("cancel"))
    EventHandler.info(this, "Future is " + future.get)
  }
}

class CancelActor extends Actor {
  def receive = {
    case "request" =>
      EventHandler.info(this, "start")
      (1 to 5).foreach(x => {
        EventHandler.info(this, "I am a long running process")
        Thread.sleep(200L)
      })
      self reply "response"
      EventHandler.info(this, "stop")
  }
}

But that did not stop the long-running process.

    [INFO]    [9/16/11 1:46 PM] [main] [Cancel$] Getting future
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] start
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [main] [Cancel$] Cancelling
    [ERROR]   [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-7] [ActorCompletableFuture] 
    java.lang.Exception: cancel
        at kozo.experimental.Cancel$.main(Cancel.scala:15)
...

    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] stop

In contrast consider the behavior of of a java.util.concurrent.Future:

object Cancel2 {
  def main(args: Array[String]) {
    val executor: ExecutorService = Executors.newSingleThreadExecutor()
    EventHandler.info(this, "Getting future")
    val future = executor.submit(new Runnable {
      def run() {
        EventHandler.info(this, "start")
        (1 to 5).foreach(x => {
          EventHandler.info(this, "I am a long running process")
          Thread.sleep(200L)
        })
      }
    })
    Thread.sleep(500L)
    EventHandler.info(this, "Cancelling")
    future.cancel(true)
    EventHandler.info(this, "Future is " + future.get)
  }
}

Which does stop the long running process

    [INFO]    [9/16/11 1:48 PM] [main] [Cancel2$] Getting future
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] start
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    Exception in thread "main" java.util.concurrent.CancellationException
...
    [INFO]    [9/16/11 1:48 PM] [main] [Cancel2$] Cancelling


You could also check the status of the Future in the Actor.

class MyActor extends Actor {
  def receive = {
    case msg =>
      while(!self.senderFuture.get.isCompleted) {
        performWork(msg)
      }
      self reply result
  }
  ...
}

This requires the message to be sent with '?' or 'ask' though. Hope it helps.


If you're just in-VM you can just pass along an AtomicBoolean with your Job message and check that in your actor intermittently to see if you should abort.

actor ! Job(..., someAtomicBoolean)

class MyActor extends Actor {
  def receive = {
    case Job(..., cancelPlease) =>
      while(cancelPlease.get == false) {
        performWork
      }
      self reply result
  }
}
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号