2017-01-30 9 views
1

を使用してスパークで()メソッドを除いて、私はcolum xcept(filteredDuplicates)を持つテーブルがあるdropDuplicatesに関する問題()とスカラ座

私はスパークシェルでこれを実行すると、予想通り、それは完全に正常に動作しています。しかし、スパーク送信では、重複削除はソートされた順序ではありません(すなわち、seq_no 3は有効なフレームにあり、1,5は拒否されたフレームにあります)。また、except()もspark submitで問題を抱えています。私はこれで一日中立ち往生しています。誰か助けてください 皆さんありがとうございます

+2

私はdropDuplicates'は、重複のグループごと_first_の記録を保持するために、任意の保証を提供し 'とは思わない - 少なくとも[ドキュメント]には、このような保証はありません(https://spark.apache.org/ docs/2.1.1/api/scala/index.html#[email protected]())。私はそれがSpark Shellで "偶然"しか動作しないと思います(1つのパーティションしかありません)。別のアプローチ、たとえば'groupBy'を使って –

+0

あなたの返事をありがとう。今私はgroup byを使ってこれに対する解決策を得ました。しかし、それでも何が問題なのか正確にはわからない。ドキュメンテーションが改善されればより良いでしょう –

答えて

0

次のメソッドは、spark-sqlが提供するrow_number関数を使用して、有効なデータフレームと無効なデータフレームを生成します。私はcassandraへのアクセスを持っていないので、ここでは単純なDataframeを使用しています。

import sqlContext.implicits._ 
val df = sc.parallelize(Seq(("a" -> 1), ("b" -> 2), ("c" -> 3), ("d" -> 4), ("a" -> 5), ("a" -> 6), ("c" -> 7), ("c" -> 8))).toDF("c1", "c2") 

df.registerTempTable("temp_table") 

val masterdf = sqlContext.sql("SELECT c1, c2, ROW_NUMBER() OVER(PARTITION BY c1 ORDER BY c2) as row_num FROM temp_table") 

masterdf.filter("row_num = 1").show() 
+---+---+-------+ 
| c1| c2|row_num| 
+---+---+-------+ 
| a| 1|  1| 
| b| 2|  1| 
| c| 3|  1| 
| d| 4|  1| 
+---+---+-------+ 


masterdf.filter("row_num > 1").show() 
+---+---+-------+ 
| c1| c2|row_num| 
+---+---+-------+ 
| a| 5|  2| 
| a| 6|  3| 
| c| 7|  2| 
| c| 8|  3| 
+---+---+-------+ 
+0

あなたの返事をありがとう。私の問題はあなたの答えとは何とか違いますが、とにかくこの問題の解決策を得ました –