2016-11-09 3 views
1

spark scalaで2つのデータフレームをテーブルとして登録しています。この2つのテーブルから spark sqlで2つのデータフレームを変換する

表1:

+-----+--------+ 
    |id |values | 
    +-----+----- + 
    | 0 | v1 | 
    | 0 | v2 | 
    | 1 | v3 | 
    | 1 | v1 | 
    +-----+----- + 

表2:

+-----+----+--- +----+ 
    |id |v1 |v2 | v3 
    +-----+-------- +----+ 
    | 0 | a1| b1| - | 
    | 1 | a2| - | c2 | 

    +-----+---------+----+ 

は、私は上記の2つのテーブルを使用して新しいテーブルを生成したいです。

表3:

+-----+--------+--------+ 
    |id |values | field | 
    +-----+--------+--------+ 
    | 0 | v1 | a1  | 
    | 0 | v2 | b1  | 
    | 1 | v3 | c2  | 
    | 1 | v1 | a2  | 
    +-----+--------+--------+ 

ここで、V1は、私がScalaでスパークSQLを使用していますフォーム

v1: struct (nullable = true) 
    | |-- level1: string (nullable = true) 
    | |-- level2: string (nullable = true) 
    | |-- level3: string (nullable = true) 
    | |-- level4: string (nullable = true) 
    | |-- level5: string (nullable = true) 

です。

いくつかのSQLクエリを書くか、データフレームにいくつかのスパーク関数を使用することで、目的を達成できますか?ここで

+0

私はINSERT INTO table1(field)、VALUES(SELECT column_name FROM table2 where table1.id = table2.id)を試していましたが、これはtable1からcolumn_nameを動的に必要とします – satyambansal117

+0

はテーブル2の有限の列名のリストですか? –

+0

カラム名の数はあらかじめわかっていませんが、カラム2の別個の値と同じ番号です。 – satyambansal117

答えて

1

は、この出力を生成します。つまり、あなたが使用できるサンプルコードです:

コードは次のようになります。コンソール上でテストしている間

val df1=sc.parallelize(Seq((0,"v1"),(0,"v2"),(1,"v3"),(1,"v1"))).toDF("id","values") 
val df2=sc.parallelize(Seq((0,"a1","b1","-"),(1,"a2","-","b2"))).toDF("id","v1","v2","v3") 
val joinedDF=df1.join(df2,"id") 
val resultDF=joinedDF.rdd.map{row=> 
val id=row.getAs[Int]("id") 
val values=row.getAs[String]("values") 
val feilds=row.getAs[String](values) 
(id,values,feilds) 
}.toDF("id","values","feilds") 

scala> val df1=sc.parallelize(Seq((0,"v1"),(0,"v2"),(1,"v3"),(1,"v1"))).toDF("id","values") 
df1: org.apache.spark.sql.DataFrame = [id: int, values: string] 

scala> df1.show 
+---+------+ 
| id|values| 
+---+------+ 
| 0| v1| 
| 0| v2| 
| 1| v3| 
| 1| v1| 
+---+------+ 


scala> val df2=sc.parallelize(Seq((0,"a1","b1","-"),(1,"a2","-","b2"))).toDF("id","v1","v2","v3") 
df2: org.apache.spark.sql.DataFrame = [id: int, v1: string ... 2 more fields] 

scala> df2.show 
+---+---+---+---+ 
| id| v1| v2| v3| 
+---+---+---+---+ 
| 0| a1| b1| -| 
| 1| a2| -| b2| 
+---+---+---+---+ 


scala> val joinedDF=df1.join(df2,"id") 
joinedDF: org.apache.spark.sql.DataFrame = [id: int, values: string ... 3 more fields] 

scala> joinedDF.show 
+---+------+---+---+---+               
| id|values| v1| v2| v3| 
+---+------+---+---+---+ 
| 1| v3| a2| -| b2| 
| 1| v1| a2| -| b2| 
| 0| v1| a1| b1| -| 
| 0| v2| a1| b1| -| 
+---+------+---+---+---+ 


scala> val resultDF=joinedDF.rdd.map{row=> 
    | val id=row.getAs[Int]("id") 
    | val values=row.getAs[String]("values") 
    | val feilds=row.getAs[String](values) 
    | (id,values,feilds) 
    | }.toDF("id","values","feilds") 
resultDF: org.apache.spark.sql.DataFrame = [id: int, values: string ... 1 more field] 

scala> 

scala> resultDF.show 
+---+------+------+                
| id|values|feilds| 
+---+------+------+ 
| 1| v3| b2| 
| 1| v1| a2| 
| 0| v1| a1| 
| 0| v2| b1| 
+---+------+------+ 

これがあなたの問題かもしれないことを願っています。ありがとう!

+0

もし私の値が文字列の代わりにstruct型であれば、そのための変更は – satyambansal117

+1

です。次に、文字列の代わりに構造体を使用して抽出し、それに応じて変換します。 –

+0

助けてくれてありがとう、ここv1、v2、v3はレベル1のサブフィールドを持つstruct型です:string、level2:string。getAs [構造体](値)を試してみると構造型が見つかりません....これを行うには – satyambansal117

関連する問題