
how to avoid race condition when using Scala's Actor

开发者 https://www.devze.com 2023-02-28 10:41 出处:网络
I am writing a piece of code that would populate a mongoDB collection when the buffer (list) grow to a certain size.

I am writing a piece of code that would populate a mongoDB collection when the buffer (list) grow to a certain size.

import scala.actors.Actor
import com.mongodb.casbah.Imports._
import scala.collection.mutable.ListBuffer

class PopulateDB extends Actor {
  val buffer = new ListBuffer[DBObject]
  val mongoConn = MongoConnection()
  val mongoCol = mongoConn("casbah_test")("logs")

  def add(info: DBObject = null) {
    if (info != null) buffer += info

    if (buffer.size > 0 && (info == null || buffer.length >= 1000)) {
      println("adding a batch")

  def act() {
    loop {
      react {
        case info: DBObject => add(info)

        case msg if msg == "closeConnection" =>
          println("Close connection")

However, when I run the following code, scala will occasionally throw a "ConcurrentModificationException" on the "mongoCol.insert(buffer.toList)" line. I am pretty sure it has something to do with "mongoCol.insert". I am wondering if there is anything fundamentally wrong with the code. Or should I use something like the "atomic {...}" from Akka to avoid the issue.

Here's the complete stack trace:

PopulateDB@7e859a68: caught java.util.ConcurrentModificationException
    at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:373)
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:392)
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:391)
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:113)
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:67)
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:215)
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:180)
    at com.mongodb.DBCollection.insert(DBCollection.java:85)
    at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:561)
    at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:864)
    at PopulateDB.add(PopulateDB.scala:14)
    at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:26)
    at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:25)
    at scala.actors.ReactorTask.run(ReactorTask.scala:34)
    at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129)
    at PopulateDB.scala$actors$ReplyReactor$$super$resumeReceiver(PopulateDB.scala:5)
    at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:69)
    at PopulateDB.resumeReceiver(PopulateDB.scala:5)
    at scala.actors.Actor$class.searchMailbox(Actor.scala:478)
    at PopulateDB.searchMailbox(PopulateDB.scala:5)
    at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
    at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
    at scala.actors.ReactorTask.run(ReactorTask.scala:36)
    at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611)
    at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJ开发者_开发百科oinWorkerThread.java:340)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)

Thanks, Derek

DBObject is not thread safe; you're sending a DBObject in with your actor message. It is likely being modified again later which is going to cause that concurrent modification problem.

I would suggest to start with, trying to use clone() on the DBObject as it comes into the actor, and put that into your buffer. It is only a shallow copy but should at least be enough to cause concurrent modification problems on the LinkedHashMap which backs the keys on DBObject ( which is kept ordered, by virtue of the LHM).

I'd try:

  def act() {
    loop {
      react {
        case info: DBObject => add(info.clone())

        case msg if msg == "closeConnection" =>
          println("Close connection")

If that doesn't work, look at anywhere else you are modifying the DBObject after it is sent to the Actor.

Why class below?

class PopulateDB extends Actor

Do you keep multiple PupulateDB actors? I'd expect object PopulateDB extends Actor, so that a single actor would concentrate this task.

Aside from that, the problem seems to be inside casbah or mongodb itself.



验证码 换一张
取 消