开发者

Using MapMaker#makeComputingMap to prevent simultaneous RPCs for the same data

开发者 https://www.devze.com 2023-01-22 03:07 出处:网络
We have a slow backend server that is getting crushed by load and we\'d like the middle-tier Scala server to only have one outstanding request to the backend for each unique lookup.

We have a slow backend server that is getting crushed by load and we'd like the middle-tier Scala server to only have one outstanding request to the backend for each unique lookup.

The backend server only stores immutable data, but upon the addition of new data, the middle-tier servers will request the newest data on behalf of the clients and the backend server has a hard time with the load. The immutable da开发者_如何学运维ta is cached in memcached using unique keys generated upon the write, but the write rate is high so we get a low memcached hit rate.

One idea I have is to use Google Guava's MapMaker#makeComputingMap() to wrap the actual lookup and after ConcurrentMap#get() returns, the middle-tier will save the result and just delete the key from the Map.

This seems a little wasteful, although the code is very easy to write, see below for an example of what I'm thinking.

Is there a more natural data structure, library or part of Guava that would solve this problem?

import com.google.common.collect.MapMaker

object Test
{
  val computer: com.google.common.base.Function[Int,Long] =
  {
    new com.google.common.base.Function[Int,Long] {
      override
      def apply(i: Int): Long =
      {
        val l = System.currentTimeMillis + i
        System.err.println("For " + i + " returning " + l)
        Thread.sleep(2000)
        l
      }
    }
  }

  val map =
  {
    new MapMaker().makeComputingMap[Int,Long](computer)
  }

  def get(k: Int): Long =
  {
    val l = map.get(k)
    map.remove(k)
    l
  }

  def main(args: Array[String]): Unit =
  {
    val t1 = new Thread() {
      override def run(): Unit =
      {
        System.err.println(get(123))
      }
    }

    val t2 = new Thread() {
      override def run(): Unit =
      {
        System.err.println(get(123))
      }
    }

    t1.start()
    t2.start()
    t1.join()
    t2.join()

    System.err.println(get(123))
  }
}


I'm not sure why you implement remove yourself, why not simply have weak or soft values and let the GC clean up for you?

new MapMaker().weakValues().makeComputingMap[Int, Long](computer)


I think what you do is quite reasonable. You only use the structure to get lock-striping on the key, to ensure that accesses to the same key conflict. No worries that you don't need a value mapping per key. ConcurrentHashMap and friends is the only structure in Java libraries+Guava that offers you lock-striping.

This does induce some minor runtime overhead, plus the size of the hashtable which you don't need (which might even grow, if accesses to the same segment pile up and remove() doesn't keep up).

If you want to make it as cheap as possible, you could code some simple lock-striping yourself. Basically an Object[] (or Array[AnyRef] :)) of N locks (N = concurrency level), and you just map the hash of the lookup key into this array, and lock. Another advantage of this is that you really don't have to do hashcode tricks that CHM requires to do, because the latter has to split the hashcode in one part to select the lock, and another for the needs of the hashtable, but you can use the whole of it just for the lock selection.

edit: Sketching my comment below:

val concurrencyLevel = 16
val locks = (for (i <- 0 to concurrencyLevel) yield new AnyRef).toArray

def access(key: K): V = {
   val lock = locks(key.hashCode % locks.size)
   lock synchronized {
     val valueFromCache = cache.lookup(key)
     valueFromCache match {
       case Some(v) => return v
       case None =>
         val valueFromBackend = backendServer.lookup(key)
         cache.put(key, valueFromBackend)
         return valueFromBackend
     }
   }
}

(Btw, is the toArray call needed? Or the returned IndexSeq is already fast to access by index?)

0

精彩评论

暂无评论...
验证码 换一张
取 消