Im currently working with two actors in scala. One, the producer, produces some data and sends it to a parcer. The producer sends a HashMap[String,HashMap[Object,List[Int]]]
through a message (along with this to mark the sender):
parcer ! (this,data)
The parser is constantly waiting for messages like so:
def act(){
loop{
react{
case (producer, data)=> parse(data);
}
}
}
The program runs perfectly in normal circunstances. 开发者_开发百科The problem comes with big volumes of data and many messages sent (The hash has about 10^4 elements, the inner hash about 100 elements and the list is 100 long), the program crashes. It shows no Error nor Exception. It just stopps.
The problem seems to be that my producer works much faster than the parser (and for the moment I don't want more than one parser).
After reading scala mailbox size limit I wonder if my parser's mailbox is reaching it's limit. The post also offers some solutions, but I first need to make sure this is the problem. How can I test this?
Is there a way to know the actor's memory limit? What about reading the used/free memory in the mailbox?
Any suggestions for the workflow that haven't been posted in that link are also welcome.
Thanks,
First, you need not pass the sender explicitly, as the sender is tracked by the Scala actors framework anyway. You can always access the sender of a message using the method sender
.
As can be seen here: scala.actors.MQueue, an actor's mailbox is implemented as a linked list and is therefore only bounded by the size of the heap.
Still, if you are concerned that the producer is very fast and the consumer is very slow, I suggest that you explore a throttling mechanism. But I wouldn't recommend the approach from the accepted answer to question scala mailbox size limit.
Trying to send overload messages when the system is heavily stressed doesn't seem to be a good idea, generally. What if your system is too busy to check for overload? What if the receiver of the overload message is too busy to act on it? Also, dropping messages doesn't sound like a very good idea to me. I would think that you want all your work items processed reliably.
Also, I wouldn't rely on the mailboxSize
to determine load. You cannot distinguish different message types and you can only check from within the consumer itself, not from the producer.
I suggest using an approach where the consumer requests more work, when he knows he can handle it.
Below is a simple example how it could be implemented.
import scala.actors._
import Actor._
object ConsumerProducer {
def main(args: Array[String]) {
val producer = new Producer(Iterator.range(0, 10000))
val consumer = new Consumer(producer)
}
}
case class Produce(count: Int)
case object Finished
class Producer[T](source: Iterator[T]) extends Actor {
start
def act() {
loopWhile(source.hasNext) {
react {
case Produce(n: Int) => produce(n)
}
}
}
def produce(n: Int) {
println("producing " + n)
var remaining = n
source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 }
if(!source.hasNext) sender ! Finished
}
}
class Consumer(producer: Actor) extends Actor {
start
private var remaining = 0
def act() {
requestWork()
consume()
}
def consume(): Nothing = react {
case Finished => println("Finished")
case n: Int => work(n); requestWork(); consume()
}
def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) }
def work(n: Int) = {
println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n })
remaining -= 1
}
}
精彩评论