2016-09-01 6 views
3

sparkとscalaの新機能です。キー値のペアであるリストのデータフレームがあるとします。列IDのid変数を新しい列としてマップする方法はありますか?新しい列としてキーを使用したキーと値のペアのスパークデータフレームを整形する

df.show() 
+--------------------+-------------------- + 
| ids    | vals     | 
+--------------------+-------------------- + 
|[id1,id2,id3]  | null     | 
|[id2,id5,id6]  |[WrappedArray(0,2,4)] | 
|[id2,id4,id7]  |[WrappedArray(6,8,10)]| 

予想される出力は:

+----+----+ 
|id1 | id2| ... 
+----+----+ 
|null| 0 | ... 
|null| 6 | ... 

答えて

3

可能な方法は、新たなデータフレームの列を計算し、行を構築するために、これらの列を使用することです。

import org.apache.spark.sql.functions._ 

val data = List((Seq("id1","id2","id3"),None),(Seq("id2","id4","id5"),Some(Seq(2,4,5))),(Seq("id3","id5","id6"),Some(Seq(3,5,6)))) 

val df = sparkContext.parallelize(data).toDF("ids","values") 

val values = df.flatMap{ 
    case Row(t1:Seq[String], t2:Seq[Int]) => Some((t1 zip t2).toMap) 
    case Row(_, null) => None 
} 

// get the unique names of the columns across the original data 
val ids = df.select(explode($"ids")).distinct.collect.map(_.getString(0)) 

// map the values to the new columns (to Some value or None) 
val transposed = values.map(entry => Row.fromSeq(ids.map(id => entry.get(id)))) 

// programmatically recreate the target schema with the columns we found in the data 
import org.apache.spark.sql.types._ 
val schema = StructType(ids.map(id => StructField(id, IntegerType, nullable=true))) 

// Create the new DataFrame 
val transposedDf = sqlContext.createDataFrame(transposed, schema) 

バッキングデータソースに応じて、カラム名を計算することはかなり安価であることができるが、このプロセスは、データを介して2回通過します。

また、これはDataFramesRDDの間を行き来します。私は "純粋な" DataFrameプロセスを見ることに興味があります。

+0

ありがとうございました!私は問題を単純化し、一般的な解決策を必要とせずに簡単な方法で解決することができます(データをフィルタリングして、すべての行で同じIDを持つDFにしました)。 – johnblund

+0

最後に私はあなたの解決策を必要としました。しかし、私は '警告:非可変型の引数String型パターンSeq [String](Seq [String]の根底にある)は消去されて消去されているのでチェックされていません。 – johnblund

+0

@johnblundこれは、型の削除によるJVMの制限です。警告を回避するには、実行時にオブジェクト(上記のt1、t2)を型にキャストする必要があります。しかし、ここで型を宣言すると、コンパイラがコードを静的に検証するのに役立ち、正しいことが分かります。 – maasg

関連する問題