2016-03-31 13 views
0

私はスパークのデータフレームを作成しています。グループの行をグループ化し、グループ化された行の値を新しい列として要素の配列に変換する必要があります。 例:スパークデータフレームの列配列に行の値を変換する

Input: 

employee | Address 
------------------ 
Micheal | NY 
Micheal | NJ 

Output: 

employee | Address 
------------------ 
Micheal | (NY,NJ) 

ご協力いただきありがとうございます。

+0

はそうします。 – Manas

+0

@Manasこれは私が得るエラーですgroupByKeyはorg.apache.spark.sql.DataFrameのメンバーではありません – vds

+2

私たちにあなたのコードを教えて..... –

答えて

5
ここ

sqlContext.createDataFrame()

を使用して、私は変換のためのRDDにデータフレームを変換し、データフレームを、それを逆変換している代替ソリューション あるSample.json

{"employee":"Michale","Address":"NY"} 
{"employee":"Michale","Address":"NJ"} 
{"employee":"Sam","Address":"NY"} 
{"employee":"Max","Address":"NJ"} 

スパークアプリケーション

val df = sqlContext.read.json("sample.json") 

// Printing the original Df 
df.show() 

//Defining the Schema for the aggregated DataFrame 
val dataSchema = new StructType(
    Array(
    StructField("employee", StringType, nullable = true), 
    StructField("Address", ArrayType(StringType, containsNull = true), nullable = true) 
) 
) 
// Converting the df to rdd and performing the groupBy operation 
val aggregatedRdd: RDD[Row] = df.rdd.groupBy(r => 
      r.getAs[String]("employee") 
     ).map(row => 
      // Mapping the Grouped Values to a new Row Object 
      Row(row._1, row._2.map(_.getAs[String]("Address")).toArray) 
     ) 

// Creating a DataFrame from the aggregatedRdd with the defined Schema (dataSchema) 
val aggregatedDf = sqlContext.createDataFrame(aggregatedRdd, dataSchema) 

// Printing the aggregated Df 
aggregatedDf.show() 

出力:

+-------+--------+---+ 
|Address|employee|num| 
+-------+--------+---+ 
|  NY| Michale| 1| 
|  NJ| Michale| 2| 
|  NY|  Sam| 3| 
|  NJ|  Max| 4| 
+-------+--------+---+ 

+--------+--------+ 
|employee| Address| 
+--------+--------+ 
|  Sam| [NY]| 
| Michale|[NY, NJ]| 
|  Max| [NJ]| 
+--------+--------+ 
+0

この回答はOKですが、RDD APIを使用するとDataFrame APIを大幅に使用するよりも遅くなります(クエリオプティマイザとタングステンの不足のため) – tribbloid

-1

あなたがしようとするとGROUPBY、その後、ピボット使用することができます:あなたはあなたの反復処理可能[アドレス]を与えるであろう、あなたが望む結果を得るためにgroupByKeyを使用することができますように

val dfPivot = df.groupBy("employee").pivot("Address").max() 
関連する問題