2016-03-21 11 views
4

とHDFSのパーティショニングを活用していないスパークは、私は、次のコマンドを使用してHDFSに寄木細工のファイルを書いています寄木細工

val file = sqlContext.read.parquet(folder) 
val data = file.map(r => Row(r.getInt(4).toString, r.getString(0), r.getInt(1), 
    r.getLong(2), r.getString(3))) 

val filteredData = data.filter(x => x.thingId.equals("1")) 
filteredData.collect() 

を私は期待Sparkはファイルの分割を利用し、 "thingId = 1"のパーティションだけを読み込むことになります。 実際、Sparkはフィルタリングされたもの(thingId = 1のパーティション)だけでなく、ファイルのすべてのパーティションを読み込みます。

16/03/21午前1時32分33秒INFOのParquetRelation: HDFSから寄せ木ファイル(複数可)を読む://サンドボックス私は、ログを見ると は、私はそれが読まないことをすべてを見ることができます。 hortonworks.com/path/id=1/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:寄木細工のファイルを読む hdfs://sandbox.hortonworks.com/path/id=42/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation:読み込み寄木細工ファイル hdfs://sandbox.hortonworks.com/path/id=17/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01 :32:33 I NFO ParquetRelation:寄木細工ファイルを読み込み からhdfs://sandbox.hortonworks.com/path/0833/id=33/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33情報ParquetRelation:寄木細工のファイルを読み込み からhdfs://sandbox.hortonworks.com/path/id=26/part-r-00000-b4e27b02-9a21-4915-89a7- 189c30ca3fe3.gz.parquet 16/03/21 01:32:33情報ParquetRelation:寄木細工ファイルを から読むhdfs://sandbox.hortonworks.com/path/id=12/part-r-00000-b4e27b02 -9a21-4915-89a7-189c30ca3fe3.gz.parquet

紛失しているものはありますか?ドキュメントを見ると、Sparkはフィルターに基づいて、thingID = 1のパーティションのみを読み取る必要があることを認識する必要があります。 誰もあなたのアイデアを持っていますか?

答えて

5

いくつかの問題は、(入力フォーマットレベルでフィルタを使用してIE)述語を「プッシュダウン」に成功からスパークを防ぐことがあります。

  1. フィルタプッシュダウンがOFFです:スパークのバージョンに応じて、あなたが使用しています述部プッシュダウンオプション(spark.sql.parquet.filterPushdown)がオフになっている可能性があります。これは、Spark 1.5.0のように、デフォルトでONです - ので、あなたのバージョンや設定を確認

  2. フィルタは「不透明」である:これは、ここでケースのようです:あなたは寄木細工のファイルをロードしている、にそれぞれの行をマッピング別の行(列の並べ替え?)、関数を受け入れるfilterメソッドを使用します。 Sparkはファンクションコードを「読む」ことができず、パーティション化カラムでの比較を使用することを認識しています。これはSparkにとって、すべてのチェックを行うことができるRow => Boolean関数です...

    フィルターのプッシュダウンを機能させるには、元の構造から「切り離された」ものにレコードをマッピングする前に使用し、filterオーバーロードのうちの1つを使用する必要があります。例:

    // assuming the relevant column name is "id" in the parquet structure 
    val filtered = file.filter("id = 1") 
    
    // or: 
    val filtered = file.filter(col("id") === 1) 
    
    // and only then: 
    val data = filtered.map(r => Row(...)) 
    
+0

どうもありがとうTzachゾハル:)問題は、あなたがオプション2、不透明なフィルタで説明したまったく同じだったので、私は、スパーク1.5.0を使用しています。行をマッピングする前にフィルタを実行すると、動作します。どうもありがとう! – AlexL

関連する問題