2016-08-26 5 views
-1

Iは以下、2つの列を有し、双方を連結するには、データの小さなセットである:sparkの1列のレコードを連結する方法は?

ID   Comments 

32412  CLOSE AS NORMAL 
32412  UNDER REVIEW 

私は、これは以下のように来ることにしたいので、基本的にビューは、IDによってグループ化やコメントを連結することです。

ID  Comments 

32412 CLOSE AS NORMAL 
     UNDER REVIEW 
+0

実際にあなたが何を求めているのかよく分かりません。そして何か試しましたか? (私はあなたにdownvoteをしなかったが、私は不明であるためにこの質問を閉じるために投票している) – eliasah

答えて

1

これにはUDF(ユーザー定義関数)を使用できます。

import scala.collection.mutable 
sqlContext.udf.register("ArrayToString",(a: mutable.WrappedArray[String]) => a.mkString("\n")) 
df.registerTempTable("IDsAndComments") 
val new_df = sqlContext.sql("WITH Data AS (SELECT ID, collect_list(Comments) AS cmnts FROM IDsAndComments GROUP BY ID) SELECT ID, ArrayToString(cmnts) AS Comments FROM Data") 

何ここで起こることは、それがSQLコードを解析するときsqlContextで使用するためには、新しい関数を定義することです:あなたはデータとdfという名前DataFrameを持っていると仮定すると、あなたはこのような何かを試すことができます。この関数はWrappedArray(SparkのDataFramesから得られる配列の型です)をとり、配列のすべての要素が新しい行で区切られた文字列に変換します。

collect_listは、グループ化された値の配列を返す関数です。あなたはSQLクエリを使用せずにこれを行うにはHiveContext

2

別の方法であるためにあなたのsqlContextを必要とするので、それは、HiveContext機能だということに注意してください:

import scala.collection.mutable 

val myUDF = udf[String, mutable.WrappedArray[String]](_.mkString(" ")) 
df.groupBy($"id") 
    .agg(collect_list("comments").as("comments")) 
    .withColumn("comments", myUDF($"comments")) 
    .show() 

それだけでなく、SQLContextとしてHiveContextが必要です。

関連する問題