Iは以下、2つの列を有し、双方を連結するには、データの小さなセットである:sparkの1列のレコードを連結する方法は?
ID Comments
32412 CLOSE AS NORMAL
32412 UNDER REVIEW
私は、これは以下のように来ることにしたいので、基本的にビューは、IDによってグループ化やコメントを連結することです。
ID Comments
32412 CLOSE AS NORMAL
UNDER REVIEW
Iは以下、2つの列を有し、双方を連結するには、データの小さなセットである:sparkの1列のレコードを連結する方法は?
ID Comments
32412 CLOSE AS NORMAL
32412 UNDER REVIEW
私は、これは以下のように来ることにしたいので、基本的にビューは、IDによってグループ化やコメントを連結することです。
ID Comments
32412 CLOSE AS NORMAL
UNDER REVIEW
これにはUDF(ユーザー定義関数)を使用できます。
import scala.collection.mutable
sqlContext.udf.register("ArrayToString",(a: mutable.WrappedArray[String]) => a.mkString("\n"))
df.registerTempTable("IDsAndComments")
val new_df = sqlContext.sql("WITH Data AS (SELECT ID, collect_list(Comments) AS cmnts FROM IDsAndComments GROUP BY ID) SELECT ID, ArrayToString(cmnts) AS Comments FROM Data")
何ここで起こることは、それがSQLコードを解析するときsqlContext
で使用するためには、新しい関数を定義することです:あなたはデータとdf
という名前DataFrame
を持っていると仮定すると、あなたはこのような何かを試すことができます。この関数はWrappedArray
(SparkのDataFramesから得られる配列の型です)をとり、配列のすべての要素が新しい行で区切られた文字列に変換します。
collect_list
は、グループ化された値の配列を返す関数です。あなたはSQLクエリを使用せずにこれを行うにはHiveContext
別の方法であるためにあなたのsqlContext
を必要とするので、それは、HiveContext
機能だということに注意してください:
import scala.collection.mutable
val myUDF = udf[String, mutable.WrappedArray[String]](_.mkString(" "))
df.groupBy($"id")
.agg(collect_list("comments").as("comments"))
.withColumn("comments", myUDF($"comments"))
.show()
それだけでなく、SQLContext
としてHiveContext
が必要です。
実際にあなたが何を求めているのかよく分かりません。そして何か試しましたか? (私はあなたにdownvoteをしなかったが、私は不明であるためにこの質問を閉じるために投票している) – eliasah