I am trying to use Scala to find the parameter to a function that yields the largest return value, and I would like to do it in parallel. So for this function:
def f(i: Long): Double = {
// do something with i and return a double
}
I want to find the input parameter i over the range (0, x) that gives the maximum value when passed to the function f. This is what I have so far:
import scala.concurrent.ops._
def parMap(f: Long => (Double, Long), xs: List[Int]): Array[(Double, Long)] = {
val results = new Array[(Double, Long)](xs.length)
replicate(0, xs.length) { i => results(i) = f(xs(i)) }
results
}
var results = parMap(i => (f(i), i), List.range(0, i)).max
It might work correctly but I get a java.lang.OutOfMemoryError: Java heap space error. For the problem I am working on the entire set of results will b开发者_C百科e too large to fit in memory, so it needs to discard results that are inferior to the best seen so far. If I make the list range small enough for it to all fit in memory, my results Array (before it calls the max method) looks kind of like this:
Array(null, null, (-Infinity,2), (-Infinity,3), null, (-Infinity,5), (-Infinity,6), (-Infinity,7), (-Infinity,8), (-22184.3237904591,9), null, (-22137.315048628963,11)...
The -Infinity values are normal for what I am doing but the nulls are not. I get different nulls each time I run it, so that is random. It is like the replicate method 'gives up' on some of the function calls and gives null instead.
Note I am using Scala 2.8.1.
Also, it seems to me accurate documentation on Scala and parallel computing is hard to come by. I would like to learn more, so I can figure out problems like this one on my own. Can anyone suggest a reliable resource I can learn from?
I'm not fully up to speed with the 2.9 parallel collections, and I'm not sure concurrent.ops
is all that well maintained, but it seems to me that your task is perfectly well suited to futures in 2.8:
// Setup--you want to use longs, so you can't use range
val x = 4000000000L // Note that this doesn't fit in a signed integer
def f(l: Long) = l + 8e9/(3+l)
def longRange(a: Long, b: Long) = new Iterator[Long] {
private[this] var i = a
def hasNext = i<b
def next = { val j = i; i += 1; j }
}
val cpus = 4
val ranges = (1 to cpus).map(i => longRange(((i-1)*x)/cpus, (i*x)/cpus))
val maxes = ranges.map(r => scala.actors.Futures.future(r.map(f).max))
println("Total max is " + maxes.map(_()).max)
Here you split the work up by hand and ask for a computation of a max over each portion of the range, which is delivered on demand by the iterator. These are computed in the future, that is, the Futures.future
returns a promise that it will deliver the return value eventually. The promise is actually kept when myFuture.apply()
is called, which in this case is the _()
inside the println
. To get the total max, you have to take the max of maxes, and this of course can't return until all the work put off to the future is actually completed.
You can try comparing the runtime of the four-threaded and single-threaded versions if you want to verify that it's working.
(Note that the answer for the function I've provided should be 4.000000001e9.)
Note also that if you really want things to run quickly, you should probably write your own range tests:
def maxAppliedRange(a: Long, b: Long, f: Long=>Double) = {
var m = f(a)
var i = a
while (i < b) {
val x = f(i)
if (m < x) m = x
i += 1
}
m
}
val maxes = (1 to cpus).map(i =>
scala.actors.Futures.future( maxAppliedRange((i-1)*x/cpus,i*x/cpus,f) )
)
println("Total max is " + maxes.map(_()).max)
This gives way better performance because there is no boxing/unboxing, and thus the garbage collector isn't stressed, and thus running in parallel gives much better results. This runs ~40x faster for me than the method above, and note that this will also be true with parallel collections. So be careful! Just using more cores isn't necessarily the way to speed up your computations, especially when engaging in a garbage-heavy task.
I think you could do this concisely by using futures but also using the global actor thread pool. In keeping with your original example:
import scala.actors.Futures._
def parMap(f: Long => (Double,Long), xs: List[Int]) : Array[(Double,Long)] = {
val results = new Array[(Double, Long)](xs.length)
val futures = (0 until xs.length).map { i =>
future { results(i) = f(xs(i)) }
}
futures.foreach(_())
results
}
results in:
scala> parMap(l => (l.toDouble,l), List(1,2,3))
res2: Array[(Double, Long)] = Array((1.0,1), (2.0,2), (3.0,3))
This will parallelize the work to be done. If you want to optimize it for the number of processors you have, you can set the size of the actor pool with the actors.corePoolSize and actors.maxPoolSize properties.
精彩评论