2010-12-13 8 views
2

私はScalaを使用して、最大の戻り値をもたらす関数へのパラメータを見つけようとしています。私はそれを並行して実行したいと思います。したがって、関数fに渡されたときに最大値を与える範囲(0、x)上の入力パラメータiを探したいと思います。これは私がこれまで持っているものです。並列計算を行うためにScala並行プログラミングを使用するにはどうすればよいですか?

import scala.concurrent.ops._ 

def parMap(f: Long => (Double, Long), xs: List[Int]): Array[(Double, Long)] = { 
    val results = new Array[(Double, Long)](xs.length) 
    replicate(0, xs.length) { i => results(i) = f(xs(i)) } 
    results 
} 

var results = parMap(i => (f(i), i), List.range(0, i)).max 

それが正常に働くかもしれないが、私はjava.lang.OutOfMemoryErrorを取得:Javaのヒープ領域のエラーを。私が取り組んでいる問題については、結果全体がメモリに収まらないほど大きすぎるため、これまで見たものより劣った結果を破棄する必要があります。私は、メモリ内のすべてのフィット感と、それのために十分に小さいリストの範囲を作成する場合は、私の結果は、Array(それが最大のメソッドを呼び出す前に)一種の次のようになります。

Array(null, null, (-Infinity,2), (-Infinity,3), null, (-Infinity,5), (-Infinity,6), (-Infinity,7), (-Infinity,8), (-22184.3237904591,9), null, (-22137.315048628963,11)... 

-Infinity値は、私は何のために正常ですやっていますが、nullはありません。私はそれがランダムですので、私はそれを実行するたびに異なるnullを取得します。これは、いくつかの関数呼び出しでレプリケート・メソッド 'give up'に似ていて、代わりにnullを与えます。

注意Scala 2.8.1を使用しています。

また、私にはスカラとパラレルコンピューティングについての正確なドキュメントは出てこないようです。私はもっ​​と学びたいので、私はこのような問題を自分で見つけ出すことができます。誰もが私が学ぶことができる信頼できるリソースを提案することはできますか?

+0

ようこそ。 "...それは働きません"というスタイルのステートメントは、あまり働かないので(エラーメッセージ/症状のような詳細を含むように編集することができます)、 2.7と2.8の差になります)。 –

+1

Scala 2.9 Parallelコレクションをご覧くださいhttp://stackoverflow.com/q/3740505/203968、2.9はこちらhttp://www.scala-lang.org/node/212/distributions – oluies

+1

Scala 2.9にもありますmaxByメソッドなので、書き込むことができます(0までi).par.maxBy(f) –

答えて

4

私は2.9並列コレクションを速度まで完全にはないんだけど、私はconcurrent.opsはすべてがうまく維持されてわからないんだけど、あなたのタスクは2.8で先物に完全に適しているように私には思える:

// Setup--you want to use longs, so you can't use range 
val x = 4000000000L // Note that this doesn't fit in a signed integer 
def f(l: Long) = l + 8e9/(3+l) 
def longRange(a: Long, b: Long) = new Iterator[Long] { 
    private[this] var i = a 
    def hasNext = i<b 
    def next = { val j = i; i += 1; j } 
} 

val cpus = 4 
val ranges = (1 to cpus).map(i => longRange(((i-1)*x)/cpus, (i*x)/cpus)) 
val maxes = ranges.map(r => scala.actors.Futures.future(r.map(f).max)) 
println("Total max is " + maxes.map(_()).max) 

ここでは、作業を手作業で分割し、範囲の各部分のmaxの計算を求めます。これは、イテレータによって必要に応じて配信されます。これらは将来計算されます。すなわち、Futures.futureは最終的に戻り値を返すという約束を返します。約束は、myFuture.apply()が呼び出されたときに実際に保持されます。この場合、の中の_()です。最大値を得るためには最大値を取らなければなりません。そして、これはもちろん、未来へのすべての仕事が実際に完了するまで戻ることはできません。

動作していることを確認したい場合は、4スレッドバージョンと1スレッドバージョンの実行時間を比較してみることができます。

は(私が提供してきた機能のための答えは4.000000001e9でなければならないことに注意してください。)

注意また、あなたが本当に物事がすぐに実行したい場合、あなたはおそらく、あなた自身の範囲のテストを記述する必要があること:

def maxAppliedRange(a: Long, b: Long, f: Long=>Double) = { 
    var m = f(a) 
    var i = a 
    while (i < b) { 
    val x = f(i) 
    if (m < x) m = x 
    i += 1 
    } 
    m 
} 
val maxes = (1 to cpus).map(i => 
    scala.actors.Futures.future(maxAppliedRange((i-1)*x/cpus,i*x/cpus,f)) 
) 
println("Total max is " + maxes.map(_()).max) 

ボクシング/アンボクシングが存在しないため、ガベージコレクタに負荷がかからず、並列実行するとパフォーマンスが向上します。これは上記の方法よりも40倍も速くなりますが、これは並列コレクションでも同様です。ので注意してください!より多くのコアを使用するだけでは、計算を高速化する方法ではなく、特にゴミの重いタスクを実行する場合は、必ずしもそうではありません。

+0

あなたの思いやりのある応答に感謝します。私は両方の例を試してみたところ、どちらもうまくいきましたが、2番目の例はずっと速かったです。私はいくつかの質問があります。 cpus変数を1に設定するとプログラムは165秒かかりますが、2に設定すると98秒かかります。なぜそれは82秒かかるのですか?私はより多くの改善が期待されました。また、私の次のステップは、自分のコードをとり、Gridgainとクラウドコンピューティングで過給して、さらに強力にすることです。それをするために私に助言がありますか? – Jim

+0

@Jim - 問題の設定に関連するオーバーヘッドがあり、2つのスレッドのうちの遅い方が終了するまで答えを得ることができません(そして、CPUが他の何らかの理由でより多くの操作を行うために使用される場合は遅くなりますシステムタスク)。私は、クラウド上をさまよう前に、並列コンピューティングについてもっと学ぶことは非常に良い考えだと思う。考えなければならないことはたくさんあります。マシンが途中でダウンしたらどうなるでしょうか?遅いマシンと速いマシンのバランスをどのように調整しますか?どのように作品を収集して配布していますか?コミュニケーションと計算のバランスをとるにはどうすればよいですか? –

+0

私はグーグルでやりましたが、並列コンピューティングについて学ぶべきことはたくさんあります。私は最初に多くの読書をしています。これとは別に、Scalaを同時に使用する方法について、どこで知ることができるでしょうか?私はScalaのWebサイトにアクセスし、ドキュメントを読みましたが、もっと学びたいと思っています。 – Jim

0

私は先物を使ってこのことを簡潔に行うことができると思うが、グローバルアクタースレッドプールも使用すると思う。

import scala.actors.Futures._ 

def parMap(f: Long => (Double,Long), xs: List[Int]) : Array[(Double,Long)] = { 
    val results = new Array[(Double, Long)](xs.length) 
    val futures = (0 until xs.length).map { i => 
    future { results(i) = f(xs(i)) } 
    } 
    futures.foreach(_()) 
    results 
} 

結果をに:あなたの元の例を踏まえて

scala> parMap(l => (l.toDouble,l), List(1,2,3)) 
res2: Array[(Double, Long)] = Array((1.0,1), (2.0,2), (3.0,3)) 

これはやるべき仕事を並列化します。所有しているプロセッサの数に合わせて最適化する場合は、actors.corePoolSizeおよびactors.maxPoolSizeプロパティを使用してアクタープールのサイズを設定できます。

関連する問題