2017-03-08 4 views
1

私はスパークジョブを作成しました。以下のようになります:スパークが大量のデータをシャッフルしています

public class TestClass { 

public static void main(String[] args){ 
String masterIp = args[0]; 
String appName = args[1]; 
String inputFile = args[2]; 
String output = args[3]; 
SparkConf conf = new SparkConf().setMaster(masterIp).setAppName(appName); 
JavaSparkContext sparkContext = new JavaSparkContext(conf); 
JavaRDD<String> rdd = sparkContext.textFile(inputFile); 
Integer[] keyColumns = new Integer[] {0,1,2}; 
Broadcast<Integer[]> broadcastJob = sparkContext.broadcast(keyColumns); 

Function<Integer,Long> createCombiner = v1 -> Long.valueOf(v1); 
Function2<Long, Integer, Long> mergeValue = (v1,v2) -> v1+v2; 
Function2<Long, Long, Long> mergeCombiners = (v1,v2) -> v1+v2; 

JavaPairRDD<String, Long> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() { 
     private static final long serialVersionUID = -6293440291696487370L; 
     @Override 
     public Tuple2<String, Integer> call(String t) throws Exception { 
     String[] record = t.split(","); 
     Integer[] keyColumns = broadcastJob.value(); 
     StringBuilder key = new StringBuilder(); 
     for (int index = 0; index < keyColumns.length; index++) { 
      key.append(record[keyColumns[index]]); 
     } 
     key.append("|id=1"); 
     Integer value = new Integer(record[4]); 
     return new Tuple2<String, Integer>(key.toString(),value); 
     }}).combineByKey(createCombiner, mergeValue, mergeCombiners).reduceByKey((v1,v2) -> v1+v2); 
     pairRDD.saveAsTextFile(output); 
    } 
} 

プログラムは、各キーの値の合計を計算します。 私の理解によれば、ローカルコンバイナは各ノード上で実行し、同じキーの値を加算して、 を入れて、少量のデータでシャッフルする必要があります。 しかし、SparkUIでは、膨大な量のシャッフル読み込みとシャッフル書き込み(ほぼ58GB)を表示しています。 何か間違っていますか? ローカルコンバイナが動作しているかどうかを確認するには?

クラスタ詳細: -
20ノードクラスタ
80ギガバイトのハードディスク、8ギガバイトRAM、4つのコアを有する各ノード
のHadoop-2.7.2
火花2.0.2(事前に作成-WITH-Hadoopの-2.7.35ディストリビューション)

入力ファイルの詳細: - :記録の400ギガバイト
が 番号:16129999990
レコード列:文字列
入力ファイルがHDFS
入力ファイルサイズに保存されています(2 char)、文字列(2文字)

注: 最大別のキーの数は1081600です(2 char)、int、int、
スパークログでは、localitylevel NODE_LOCALで実行中のタスクが表示されます。

enter image description here

+0

combineByKeyをreduceByKeyとしているのはなぜですか? ReduceByKeyはあなたの例のように、コンバイナーとしてレデューサーを使用します –

答えて

0

のは、この問題を分解し得るか見てみましょう。計算を簡略化するためにはそれを想定することができます:

    レコードの
  • 総数がユニークキーの1.6e8
  • 数が
  • 分割サイズは128メガバイトである1E6である(これはあなたのUIでのタスクの数と一致しているようです)。

これらの値を使用すると、データは約3200個のパーティションに分割されます(ケースでは3125個)。これにより、スプリットあたり約51200レコードが得られます。さらに、キーあたりの値の数の分布が均一であれば、平均して1つのキーにつき〜160レコードが必要です。

データがランダムに分散されている場合(キーでソートされていないなど)、パーティションあたりのキーあたりの平均レコード数が1に近づくことが期待できます*。これは基本的にマップ側の結合がデータの量をまったく減らさない最悪のシナリオです。

さらに、フラットファイルのサイズは通常、シリアル化されたオブジェクトのサイズよりも大幅に小さくなることを覚えておく必要があります。

実際のデータでは、データ収集プロセスから何らかの種類の注文が出ることが予想されるため、上記で計算したものより優れているはずですが、結論として、データがパーティションでグループ化されていない場合は、改善を全くもたらさないかもしれない。

シャッフルされたデータの量を少し大きくすることで、おそらくパーティションを分割する(256MBで1パーティションあたり100Kを少し上回る)ことができますが、これは長いGC休止やその他のGC問題の代償です。

import pandas as pd 
import numpy as np 

(pd 
    .DataFrame({"x": np.random.choice(np.arange(3200), size=160, replace=True)}) 
    .groupby("x") 
    .x.count() 
    .mean()) 

か、単にランダムに3200個のバケットに160球を割り当てる問題を考えると:あなたが交換にサンプルを採取することによって、これをシミュレートすることができますいずれか*


関連する問題