2016-04-28 5 views
4

データフレームでSpark SQLを使用しています。私は入力データフレームを持っており、より多くの列を持つより大きなデータフレームにその行を追加(または挿入)したいと思います。どうすればいい?Spark SQL:(別のテーブルの)データフレームテーブルに新しい行を追加する方法

これがSQLの場合は、INSERT INTO OUTPUT SELECT ... FROM INPUTを使用しますが、Spark SQLでこれを行う方法はわかりません。具体性のために

var input = sqlContext.createDataFrame(Seq(
     (10L, "Joe Doe", 34), 
     (11L, "Jane Doe", 31), 
     (12L, "Alice Jones", 25) 
     )).toDF("id", "name", "age") 

var output = sqlContext.createDataFrame(Seq(
     (0L, "Jack Smith", 41, "yes", 1459204800L), 
     (1L, "Jane Jones", 22, "no", 1459294200L), 
     (2L, "Alice Smith", 31, "", 1459595700L) 
     )).toDF("id", "name", "age", "init", "ts") 


scala> input.show() 
+---+-----------+---+ 
| id|  name|age| 
+---+-----------+---+ 
| 10| Joe Doe| 34| 
| 11| Jane Doe| 31| 
| 12|Alice Jones| 25| 
+---+-----------+---+ 

scala> input.printSchema() 
root 
|-- id: long (nullable = false) 
|-- name: string (nullable = true) 
|-- age: integer (nullable = false) 


scala> output.show() 
+---+-----------+---+----+----------+ 
| id|  name|age|init|  ts| 
+---+-----------+---+----+----------+ 
| 0| Jack Smith| 41| yes|1459204800| 
| 1| Jane Jones| 22| no|1459294200| 
| 2|Alice Smith| 31| |1459595700| 
+---+-----------+---+----+----------+ 

scala> output.printSchema() 
root 
|-- id: long (nullable = false) 
|-- name: string (nullable = true) 
|-- age: integer (nullable = false) 
|-- init: string (nullable = true) 
|-- ts: long (nullable = false) 

私はoutputの終わりにinputのすべての行を追加したいと思います。同時に、列のinitを空の文字列''に、tsの列を現在のタイムスタンプに設定したいとします。 1461883875L。

ご協力いただければ幸いです。

+0

ところで、分散データのために何の「スタート」や「終了」、あなたはインデックスの忘れてならないがありません。また、Scalaで 'var'sを使うことは悪い習慣として知られています。あなたがそれがnullableでないことを示すスキーマの 'ts'のデフォルト値は何ですか –

答えて

12

スパークDataFramesは変更不可能なので、行を追加/挿入することはできません。代わりに、あなただけ不足している列を追加し、UNION ALLを使用することができます。

output.unionAll(input.select($"*", lit(""), current_timestamp.cast("long"))) 
+0

これは素晴らしいです。ありがとうございました。 – stackoverflowuser2010

+0

素敵な、私は 'output.unionAll(input.select(input.columns.map(col)++ List)(lit(" ")。as(" init ")、lit(0L).as("あなたの答えは明らかに改善されています –

+1

私はまた考えていました: 'var input2 = input.withColumn(" init "、lit(null:String))。withColumn(" ts ")):。 、current_timestamp.cast( "long")) '次にunionAll()を実行すると、 – stackoverflowuser2010

関連する問題