2016-04-06 12 views
7

ベルヌーイ分布をシミュレートするために乱数を考慮する必要があるメソッドを記述しました。私はrandom.nextDoubleを使用して0と1の間の数値を生成し、その値に基づいて自分の確率パラメータを決定します。スパーク - 乱数生成

私の問題は、Sparkが私のforループマッピング関数の各反復で同じ乱数を生成していることです。私はDataFrame APIを使用しています。私のコードは、以下のフォーマットに準拠します。ここでは

val myClass = new MyClass() 
val M = 3 
val myAppSeed = 91234 
val rand = new scala.util.Random(myAppSeed) 

for (m <- 1 to M) { 
    val newDF = sqlContext.createDataFrame(myDF 
    .map{row => RowFactory 
     .create(row.getString(0), 
     myClass.myMethod(row.getString(2), rand.nextDouble()) 
    }, myDF.schema) 
} 

クラスです:

class myClass extends Serializable { 
    val q = qProb 

    def myMethod(s: String, rand: Double) = { 
    if (rand <= q) // do something 
    else // do something else 
    } 
} 

私はmyMethodが呼び出されるたびに新しい乱数を必要としています。私はまた、以下のような(Serializableを拡張しないscala.util.Random V10)java.util.Randomと私の方法の中に数字を生成しようとしたが、私はまだ、私はいくつかの研究を行ってきたループ

val r = new java.util.Random(s.hashCode.toLong) 
val rand = r.nextDouble() 

各内の同じ番号を取得していますし、これは、スパークスの決定論的性質と関係しているようです。

答えて

2

繰り返される同じシーケンスがランダムジェネレータが作成され、データが分割される前に、シードで初期化されていることである理由。各パーティションは同じランダムシードから開始します。そうでないかもしれない、それを行うための最も効率的な方法は、しかし、次のように動作するはずです:

val myClass = new MyClass() 
val M = 3 

for (m <- 1 to M) { 
    val newDF = sqlContext.createDataFrame(myDF 
    .map{ 
     val rand = scala.util.Random 
     row => RowFactory 
     .create(row.getString(0), 
     myClass.myMethod(row.getString(2), rand.nextDouble()) 
    }, myDF.schema) 
} 
+0

1のでこれは非常に、私の問題を解決するが、私はおそらく将来的に使用されるそのエレガントな解決策ではありませんでした。私は乱数を自分のメソッドに渡し、そこから乱数を生成しました。これは私の問題を解決しましたが、直列化の理由から 'java.util.Random'を使用しなければなりませんでした。 –

4

ちょうどSQL関数randを使用します。

import org.apache.spark.sql.functions._ 

//df: org.apache.spark.sql.DataFrame = [key: int] 

df.select($"key", rand() as "rand").show 
+---+-------------------+ 
|key|    rand| 
+---+-------------------+ 
| 1| 0.8635073400704648| 
| 2| 0.6870153659986652| 
| 3|0.18998048357873532| 
+---+-------------------+ 


df.select($"key", rand() as "rand").show 
+---+------------------+ 
|key|    rand| 
+---+------------------+ 
| 1|0.3422484248879837| 
| 2|0.2301384925817671| 
| 3|0.6959421970071372| 
+---+------------------+ 
+0

私は私の問題を解決するために、わずかにこれを修正 –

2

this postによると、最善の解決策は、マップ内のnew scala.util.Randomを置くことはなく、また完全に外(すなわちドライバコードインチ)が、中間mapPartitionsWithIndex中:

import scala.util.Random 
val myAppSeed = 91234 
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) => 
    val rand = new scala.util.Random(indx+myAppSeed) 
    iter.map(x => (x, Array.fill(10)(rand.nextDouble))) 
}