2016-03-19 6 views
2

のSpark reduce機能を使用すると予期しない動作が発生しました。ここではサンプルコードがあります:Apache Spark Reduce java.lang.Math.maxによる予期しない動作

JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator)); 
System.out.println(populationWithFitness.values().collect().toString()); 
long currentMaxFitness = populationWithFitness.values().reduce(Math::max); 
System.out.println("After Reduce: " + currentMaxFitness); 

上記のコードは複数回呼び出されると、それはほとんどの時間は、このような予期せぬ結果を生成します:あなたは減速が値を生成見ることができるように

[-2754285, -2535458, -2626449, -3182283] //printed RDD after collect 
After Reduce: -2392513 //value produced by reducer 

-2392513ただし、この値がさえませんRDDの印刷された値と比較した場合のRDD。それはなぜです? collect()reduce()に影響しますか?私は元のRDDを元に戻してから収集していましたが、私はまだこの奇妙な動作を観察しています。私は、java.Mathライブラリから静的メソッドを渡すことでシリアル化する際に問題が生じるかもしれないと考えていましたが、Spark Quick Start TutorialMath.maxreducerに使用しているようです。

アイデア?

はありがとう

EDIT:

追加情報:このスニペットは、複数の反復を持っているより大きなプログラムの一部であり、それは各反復で呼ばれています。

JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator)); 
System.out.println(populationWithFitness.values().collect().toString()); 
System.out.println(populationWithFitness.values().collect().toString()); 
System.out.println(populationWithFitness.values().collect().toString()); 
long currentMaxFitness = populationWithFitness.values().reduce(Math::max); 
System.out.println("After Reduce: " + currentMaxFitness); 

:私はこのような行のpopulationWithFitness.values().collect().toString()三回を印刷するとき

:最初の反復がreducerから製造maxValueが正しい値であるが、他のすべての反復が奇妙な結果を

EDIT2を生産している正しい結果を生成します

Generation 1 
[-3187591, -3984035, -3508984, -3054649] 
[-3187591, -3984035, -3508984, -3054649] 
[-3187591, -3984035, -3508984, -3054649] 
After Reduce: -3054649 
Generation 2 
[-3084310, -3931687, -3508984, -3054649] 
[-3084310, -3847178, -3508984, -2701881] 
[-3148206, -3984035, -2806859, -2989184] 
After Reduce: -2949478 
Generation 3 
[-3187591, -3984035, -3696853, -3054649] 
[-3187591, -3984035, -3178920, -3015411] 
[-3148206, -3804759, -3657984, -2701881] 
After Reduce: -2710313 
Generation 4 
[-3187591, -2982220, -3310753, -3054649] 
[-3148206, -2985628, -3657984, -2701881] 
[-3148206, -2706580, -3451228, -2989184] 
After Reduce: -2692651 
. 
. 
. 

fiのように出力が表示されます。最初の繰り返しはうまくいきますが、次のすべての反復では奇妙な出力が生成されます。私は問題があると思っています。遅延評価と私がコールすると、変換は適用されませんが、わかりません。

私はまたJavaDoubleRDDreduce(Math::max)を交換しようとしたと、このJavaDoubleRDDmaxと呼ばれるが、結果は同じであった:

JavaDoubleRDD stats = populationWithFitness.mapToDouble(tup -> tup._2()); 
long currentMaxFitness = stats.max().longValue(); 

私はパラメータでそれを実行しているローカルモードでこのコードをテストしていもう一つの重要なポイント:

spark --class "main.TravellingSalesmanMain" --master local[4] exampletravellingsalesman-1.0-SNAPSHOT.jar > sparkoutput.txt 
+0

これは実際のコードですか? –

+0

はい、 'map()'を適用する 'sampleRdd'は、マッピングの後に' Long'値を生成するオブジェクトのコレクションです。他の部分は、わかりやすいように、置き換えられた変数名で私のコードのスニペットです。提供されたサンプル出力も実際の出力です。 – MichaelDD

+0

あなたのrddのソースは何ですか? –

答えて

1

これはおそらくevaluateFitness(isl, fitnessCalculator)のどこかで発生している可能性があります(99%)。何らかの種類の再現性のないソースを使用しているため、異なる1つの異なる実行結果を返すようです。スパークは怠惰であり、実行はそれぞれの連続するアクションで再実行されることを忘れないでください。これを助けるためにキャッシュを使うことはできますが、それでも失敗する可能性があります(ノードに障害が発生したり、データがキャッシュから脱落してしまいます)。あなたはここでチェックポイントを使うことが最善の策ですが、実行自体を変更して冪等でなければならないことさえあります。

+0

ありがとうございました。 'キャッシュ'は私のために働く。あなたは 'determinFitness'ではなく、' selection'と 'crossover'関数(私は遺伝的アルゴリズムについて言及している変数命名法から推測していると思います)にいくつかのランダム性因子を必要とする私は実装ビットを変更することはできませんので、動作するように。私はマップの段階を連鎖させ、最高の解を得るために最大の適応度と最終的な母集団についてのみ 'reduce'を呼び出します。私はHadoopで実装したいくつかの研究論文をたどりましたが、Spark Lazy evalは少し難解です。 – MichaelDD

関連する問題