のSpark reduce
機能を使用すると予期しない動作が発生しました。ここではサンプルコードがあります:Apache Spark Reduce java.lang.Math.maxによる予期しない動作
JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator));
System.out.println(populationWithFitness.values().collect().toString());
long currentMaxFitness = populationWithFitness.values().reduce(Math::max);
System.out.println("After Reduce: " + currentMaxFitness);
上記のコードは複数回呼び出されると、それはほとんどの時間は、このような予期せぬ結果を生成します:あなたは減速が値を生成見ることができるように
[-2754285, -2535458, -2626449, -3182283] //printed RDD after collect
After Reduce: -2392513 //value produced by reducer
を-2392513
ただし、この値がさえませんRDDの印刷された値と比較した場合のRDD。それはなぜです? collect()
はreduce()
に影響しますか?私は元のRDDを元に戻してから収集していましたが、私はまだこの奇妙な動作を観察しています。私は、java.Math
ライブラリから静的メソッドを渡すことでシリアル化する際に問題が生じるかもしれないと考えていましたが、Spark Quick Start TutorialもMath.max
をreducer
に使用しているようです。
アイデア?
はありがとうEDIT:
追加情報:このスニペットは、複数の反復を持っているより大きなプログラムの一部であり、それは各反復で呼ばれています。
JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator));
System.out.println(populationWithFitness.values().collect().toString());
System.out.println(populationWithFitness.values().collect().toString());
System.out.println(populationWithFitness.values().collect().toString());
long currentMaxFitness = populationWithFitness.values().reduce(Math::max);
System.out.println("After Reduce: " + currentMaxFitness);
:私はこのような行のpopulationWithFitness.values().collect().toString()
三回を印刷するとき
:最初の反復がreducer
から製造maxValue
が正しい値であるが、他のすべての反復が奇妙な結果を
EDIT2を生産している正しい結果を生成します
Generation 1
[-3187591, -3984035, -3508984, -3054649]
[-3187591, -3984035, -3508984, -3054649]
[-3187591, -3984035, -3508984, -3054649]
After Reduce: -3054649
Generation 2
[-3084310, -3931687, -3508984, -3054649]
[-3084310, -3847178, -3508984, -2701881]
[-3148206, -3984035, -2806859, -2989184]
After Reduce: -2949478
Generation 3
[-3187591, -3984035, -3696853, -3054649]
[-3187591, -3984035, -3178920, -3015411]
[-3148206, -3804759, -3657984, -2701881]
After Reduce: -2710313
Generation 4
[-3187591, -2982220, -3310753, -3054649]
[-3148206, -2985628, -3657984, -2701881]
[-3148206, -2706580, -3451228, -2989184]
After Reduce: -2692651
.
.
.
fiのように出力が表示されます。最初の繰り返しはうまくいきますが、次のすべての反復では奇妙な出力が生成されます。私は問題があると思っています。遅延評価と私がコールすると、変換は適用されませんが、わかりません。
私はまたJavaDoubleRDD
でreduce(Math::max)
を交換しようとしたと、このJavaDoubleRDD
にmax
と呼ばれるが、結果は同じであった:
JavaDoubleRDD stats = populationWithFitness.mapToDouble(tup -> tup._2());
long currentMaxFitness = stats.max().longValue();
私はパラメータでそれを実行しているローカルモードでこのコードをテストしていもう一つの重要なポイント:
spark --class "main.TravellingSalesmanMain" --master local[4] exampletravellingsalesman-1.0-SNAPSHOT.jar > sparkoutput.txt
これは実際のコードですか? –
はい、 'map()'を適用する 'sampleRdd'は、マッピングの後に' Long'値を生成するオブジェクトのコレクションです。他の部分は、わかりやすいように、置き換えられた変数名で私のコードのスニペットです。提供されたサンプル出力も実際の出力です。 – MichaelDD
あなたのrddのソースは何ですか? –