可能な方法は、新たなデータフレームの列を計算し、行を構築するために、これらの列を使用することです。
import org.apache.spark.sql.functions._
val data = List((Seq("id1","id2","id3"),None),(Seq("id2","id4","id5"),Some(Seq(2,4,5))),(Seq("id3","id5","id6"),Some(Seq(3,5,6))))
val df = sparkContext.parallelize(data).toDF("ids","values")
val values = df.flatMap{
case Row(t1:Seq[String], t2:Seq[Int]) => Some((t1 zip t2).toMap)
case Row(_, null) => None
}
// get the unique names of the columns across the original data
val ids = df.select(explode($"ids")).distinct.collect.map(_.getString(0))
// map the values to the new columns (to Some value or None)
val transposed = values.map(entry => Row.fromSeq(ids.map(id => entry.get(id))))
// programmatically recreate the target schema with the columns we found in the data
import org.apache.spark.sql.types._
val schema = StructType(ids.map(id => StructField(id, IntegerType, nullable=true)))
// Create the new DataFrame
val transposedDf = sqlContext.createDataFrame(transposed, schema)
バッキングデータソースに応じて、カラム名を計算することはかなり安価であることができるが、このプロセスは、データを介して2回通過します。
また、これはDataFrames
とRDD
の間を行き来します。私は "純粋な" DataFrameプロセスを見ることに興味があります。
ありがとうございました!私は問題を単純化し、一般的な解決策を必要とせずに簡単な方法で解決することができます(データをフィルタリングして、すべての行で同じIDを持つDFにしました)。 – johnblund
最後に私はあなたの解決策を必要としました。しかし、私は '警告:非可変型の引数String型パターンSeq [String](Seq [String]の根底にある)は消去されて消去されているのでチェックされていません。 – johnblund
@johnblundこれは、型の削除によるJVMの制限です。警告を回避するには、実行時にオブジェクト(上記のt1、t2)を型にキャストする必要があります。しかし、ここで型を宣言すると、コンパイラがコードを静的に検証するのに役立ち、正しいことが分かります。 – maasg