私はスパークジョブを作成しました。以下のようになります:スパークが大量のデータをシャッフルしています
public class TestClass {
public static void main(String[] args){
String masterIp = args[0];
String appName = args[1];
String inputFile = args[2];
String output = args[3];
SparkConf conf = new SparkConf().setMaster(masterIp).setAppName(appName);
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaRDD<String> rdd = sparkContext.textFile(inputFile);
Integer[] keyColumns = new Integer[] {0,1,2};
Broadcast<Integer[]> broadcastJob = sparkContext.broadcast(keyColumns);
Function<Integer,Long> createCombiner = v1 -> Long.valueOf(v1);
Function2<Long, Integer, Long> mergeValue = (v1,v2) -> v1+v2;
Function2<Long, Long, Long> mergeCombiners = (v1,v2) -> v1+v2;
JavaPairRDD<String, Long> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = -6293440291696487370L;
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] record = t.split(",");
Integer[] keyColumns = broadcastJob.value();
StringBuilder key = new StringBuilder();
for (int index = 0; index < keyColumns.length; index++) {
key.append(record[keyColumns[index]]);
}
key.append("|id=1");
Integer value = new Integer(record[4]);
return new Tuple2<String, Integer>(key.toString(),value);
}}).combineByKey(createCombiner, mergeValue, mergeCombiners).reduceByKey((v1,v2) -> v1+v2);
pairRDD.saveAsTextFile(output);
}
}
プログラムは、各キーの値の合計を計算します。 私の理解によれば、ローカルコンバイナは各ノード上で実行し、同じキーの値を加算して、 を入れて、少量のデータでシャッフルする必要があります。 しかし、SparkUIでは、膨大な量のシャッフル読み込みとシャッフル書き込み(ほぼ58GB)を表示しています。 何か間違っていますか? ローカルコンバイナが動作しているかどうかを確認するには?
クラスタ詳細: -
20ノードクラスタ
80ギガバイトのハードディスク、8ギガバイトRAM、4つのコアを有する各ノード
のHadoop-2.7.2
火花2.0.2(事前に作成-WITH-Hadoopの-2.7.35ディストリビューション)
入力ファイルの詳細: - :記録の400ギガバイト
が 番号:16129999990
レコード列:文字列
入力ファイルがHDFS
入力ファイルサイズに保存されています(2 char)、文字列(2文字)
注: 最大別のキーの数は1081600です(2 char)、int、int、
スパークログでは、localitylevel NODE_LOCALで実行中のタスクが表示されます。
combineByKeyをreduceByKeyとしているのはなぜですか? ReduceByKeyはあなたの例のように、コンバイナーとしてレデューサーを使用します –