2016-04-07 10 views
2

SPARK DataFrame APIを使用してIDでグループ化し、グループ内のすべての値の組み合わせを計算し、単一の出力データフレームを生成するにはどうすればよいですか?Spark Dataframe API:IDによるグループ化と計算の組み合わせ

val testSchema = StructType(Array(
    StructField("id", IntegerType), 
    StructField("value", StringType))) 

val test_rows = Seq(
    Row(1, "a"), 
    Row(1, "b"), 
    Row(1, "c"), 
    Row(2, "a"), 
    Row(2, "d"), 
    Row(2, "e") 
) 
val test_rdd = sc.parallelize(test_rows) 
val test_df = sqlContext.createDataFrame(test_rdd, testSchema) 

期待出力:これまで

1 a b 
1 a c 
1 b c 
2 a d 
2 a e 
2 d e 

ベストソリューション:参加

実行し、自己、IDの平等に関するフィルタ(B)および(b、a)の結合を実行しながら、例えば、重複セットを排除する方法:と等しい値に

val result = test_df.join(
    test_df.select(test_df.col("id").as("r_id"), test_df.col("value").as("r_value")), 
    ($"id" === $"r_id") and ($"value" !== $"r_value")).select("id", "value", "r_value") 


+---+-----+-------+ 
| id|value|r_value| 
+---+-----+-------+ 
| 1| a|  b| 
| 1| a|  c| 
| 1| b|  a| 
| 1| b|  c| 
| 1| c|  a| 
| 1| c|  b| 
| 2| a|  d| 
| 2| a|  e| 
| 2| d|  a| 
| 2| d|  e| 
| 2| e|  a| 
| 2| e|  d| 
+---+-----+-------+ 

残りの問題を解消?

+1

この場合、 'DataFame'の代わりに' RDD'を使う方が良いでしょう。この[Spark DataFrame Aggregation Function](http://stackoverflow.com/questions/33899977/spark-dataframe-custom-aggregation-ベクトルの和を関数の和にする)、なぜそれに気付くでしょう。 –

答えて

1

値フィールドのオブジェクトを注文していますか?もしそうなら、IDが同じで、左のテーブルの値が右のテーブルの値より小さいことを要求しながら、データフレームに自分自身だけで結合できるようです。

[編集]注文がなく、IDごとの値が十分にない場合は、groupByKeyを使用して、結果のシーケンスからすべての組み合わせを作成します。これは、すべてを作成するよりも簡単に行うことができます半分しか保持しない。たとえば、Scalaを使用している場合、Seqcombinationファンクション[doc]は、必要な処理を実行すると信じています)。これは、ほとんどのデータセットの自己結合アプローチよりもはるかに悪い結果になります。

+0

残念ながら、値は数値ではないので、後処理ステップで重複するセットを排除する必要があります。 – behas

+0

@behas:注文のために数字である必要はありません。たとえば、文字列の場合は、比較のために使用できる辞書式の順序があります。 '' a "<" b "'は 'true'に解決され、' 'b" <"a" 'は' false'に解決されます。より複雑な構造のオブジェクトの場合は、ユニークなidsや文字列ラベルを比較することもできます。 –

+0

いずれかの辞書編集順がありません...値はハッシュです – behas

関連する問題