开发者

A light weight Scala fork join syntax

开发者 https://www.devze.com 2022-12-16 21:59 出处:网络
Despite the upcoming java 7 standard fork/join framework, I am building some helper method that is light weight in syntax for client to run code in parallel.

Despite the upcoming java 7 standard fork/join framework, I am building some helper method that is light weight in syntax for client to run code in parallel. Here is a runnable main method to illustrate the idea.

import actors.Futures

object ForkTest2 {



  def main(args: Array[String]) {
    test1
    test2
  }



  def test1 {
    val (a, b, c) =fork({
      Thread.sleep(500)
      println("inside fx1 ",+System.currentTimeMillis)
      true
    }, {
      Thread.sleep(1000)
      println("inside fx2 ",+System.currentTimeMillis)
      "stringResult"
    }, {
      Thread.sleep(1500)
      println("inside fx3 ",+System.currentTimeMillis)
      1
    })

    println(b, a, c)
    true
  }

  def test2 {
    val results = forkAll({
      () =>
              Thread.sleep(500)
              println("inside fx1 ",+System.currentTimeMillis)
              true
    }, {
      () =>
              Thread.sleep(1000)
              println("inside fx2 ",+System.currentTimeMillis)
              "stringResult"
    }, {
      () =>
              Thread.sleep(1500)
              println("inside fx3 ",+System.currentTimeMillis)
              1
    }, {
      () =>
              Thread.sleep(2000)
              println("inside fx4 ",+System.currentTimeMillis)
              1.023
    })

    println(results)
    true
  }

  val tenMinutes = 1000 * 60 * 10

  def fork[A, B, C](
          fx1: => A,
          fx2: => B,
          fx3: => C
          ) = {
    val re1 = Futures.future(fx1)
    val re2 = Futures.future(fx2)
    val re3 = Futures.future(fx3)
    //default wait 10 minutes
    val result = Futures.awaitAll(tenMinutes, re1, re2, re3)
    (
            result(0).asInstanceOf[Option[A]],
            result(1).asInstanceOf[Option[B]],
            result(2).asInstanceOf[Option[C]]

            )
  }

  type fxAny = () => Any

  def forkAll(
          fx1: fxAny*
          ): List[Any] = {
    val results = fx1.toList.map {fx: fxAny =开发者_高级运维> Futures.future(fx())}
    Futures.awaitAll(tenMinutes, results: _*)
  }
}

a sample out put is

(inside fx1 ,1263804802301)
(inside fx2 ,1263804802801)
(inside fx3 ,1263804803301)
(Some(stringResult),Some(true),Some(1))
(inside fx1 ,1263804803818)
(inside fx2 ,1263804804318)
(inside fx3 ,1263804804818)
(inside fx4 ,1263804805318)
List(Some(true), Some(stringResult), Some(1), Some(1.023))

test 1 illustrate a type safe return type

test 2 illustrate a arbitrary input argument

I hope to combine the two test method so the client code can run arbitrary function in parallel with type safe return type.

Another point about the arbitrary function arguments is:

I think the line

  type fxAny = () => Any

should really be code as

  type fxAny =  => Any

, but the scala compiler do not allow me to do so.

Any help is appreciate.


Eric Torreborre wrote in the link provided by @retronym:

trait LazyParameters { 
  /** transform a value to a zero-arg function returning that value */ 
  implicit def toLazyParameter[T](value: =>T) = new LazyParameter(() => value) 
  /** class holding a value to be evaluated lazily */ 
  class LazyParameter[T](value: ()=>T) { 
    lazy val v = value() 
    def apply() = v 
  } 
} 

Here's LazyParameter version of your test:

object ForkTest2 extends LazyParameters {

...

def forkAll(fx1: LazyParameter[Any]*): List[Any] = {
  val results = fx1.toList.map {
    fx: LazyParameter[Any] => Futures.future(fx.apply())}
  Futures.awaitAll(tenMinutes, results: _*)
}

Edit: As you've noticed, implicit evaluates the by-name parameter and it doesn't carry forward the evaluation delay. Why not just use the word future? I personally think it makes the code more readable.

import actors.Futures
import actors.Futures.future
import actors.Future

...

def test2 {
  val results = forkAll(
    future {
      Thread.sleep(500)
      println("inside fx1 ",+System.currentTimeMillis)
      true
    },
    future {
      Thread.sleep(1000)
      println("inside fx2 ",+System.currentTimeMillis)
      "stringResult"
    },
    future {
      Thread.sleep(1500)
      println("inside fx3 ",+System.currentTimeMillis)
      1
    },
    future {
      Thread.sleep(2000)
      println("inside fx4 ",+System.currentTimeMillis)
      1.023
    })

  println(results)
  true
}

...

def forkAll(futures: Future[Any]*): List[Any] = {
  println("forkAll")
  Futures.awaitAll(tenMinutes, futures: _*)
}


You can't use call-by-name types as a repeated parameter (aka varargs), thanks to this bug: https://lampsvn.epfl.ch/trac/scala/ticket/237

See recent discussion here: http://old.nabble.com/Lazy-varargs-td27169264.html


A sample implementation for anyone interested,

For use case that with a few function argument, may be just define list of fork function with different number of arguments (following the idea in tuple definition) and enjoy type safe return type

object ForkTest4 extends LazyParameters {
  def main(args: Array[String]) {
    test4
  }

  def test4 {
    println("Begin test 4")
    //Without the explicit call to type conversion would cause early evaluation
    //    val result4 = forkAll({
    val result4 = forkAll(l {
      Thread.sleep(500)
      println("inside fx4 ", +System.currentTimeMillis)
      true
    })
    println(result4)
    true
  }

  val tenMinutes = 1000 * 60 * 10

  def forkAll(fx1: (() => Any)*): List[Any] = {

    val results = fx1.toList.map {

      fx: (() => Any) => {

        val result = Futures.future(fx.apply())

        result
      }
    }
    Futures.awaitAll(tenMinutes, results: _*)
  }

  def l[T](t: => T): (() => T) = () => t

  implicit def implicitLazy[T](t: => T): (() => T) = () => t
}
0

精彩评论

暂无评论...
验证码 换一张
取 消