2016-06-01 4 views
0

我々は2つのデータフレーム(説明のためのScalaの構文に注意してください)、私たちはこの新しいデータフレームを得るように、各フレームから1列を合計する方法異なるデータフレームからのスパーク和の列に

val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x") 

val df2 = sc.parallelize(2 to 4).map(i => (i,i*100)).toDF("id","y") 

+---+---------+ 
| id| x_plus_y| 
+---+---------+ 
| 1|  10| 
| 2|  220| 
| 3|  330| 
| 4|  440| 
+---+---------+ 
を持っています

これを試みたが、それは最初の行を無効、

sqlContext.sql("select df1.id, x+y as x_plus_y from df1 left join df2 on df1.id=df2.id").show 

+---+--------+ 
| id|x_plus_y| 
+---+--------+ 
| 1| null| 
| 2|  220| 
| 3|  330| 
| 4|  440| 
+---+--------+ 

答えて

1

val d = sqlContext.sql(""" 
    select df1.id, x, y from df1 left join df2 on df1.id=df2.id""").na.fill(0) 

をこのソリューションに気づきましたそのためのUDF:

val df3 = df1.as('a).join(df2.as('b), $"a.id" === $"b.id","left"). 
       select(df1("id"),'x,'y,(coalesce('x, lit(0)) + coalesce('y, lit(0))).alias("x_plus_y")).na.fill(0) 

df3.show 
// df3: org.apache.spark.sql.DataFrame = [id: int, x: int, y: int, x_plus_y: int] 
// +---+---+---+--------+ 
// | id| x| y|x_plus_y| 
// +---+---+---+--------+ 
// | 1| 10| 0|  10| 
// | 2| 20|200|  220| 
// | 3| 30|300|  330| 
// | 4| 40|400|  440| 
// +---+---+---+--------+ 
3
df3 = df1.join(df2, df1.id == df2.id, "left_outer").select(df1.id, df1.x, df2.y).fillna(0) 
df3.select("id", (df3.x + df3.y).alias("x_plus_y")).show() 

これはPythonで動作します。 Scalaで

0

は、あなたも使用する必要はありません、フレームに参加し、ゼロと非利用可能な値を交換し、このUDFを定義するには、

import org.apache.spark.sql.functions 
import org.apache.spark.sql.functions._ 

val plus: (Int,Int) => Int = (x:Int,y:Int) => x+y 
val plus_udf = udf(plus) 

d.withColumn("x_plus_y", plus_udf($"x", $"y")).show 
+---+---+---+--------+ 
| id| x| y|x_plus_y| 
+---+---+---+--------+ 
| 1| 10| 0|  10| 
| 2| 20|200|  220| 
| 3| 30|300|  330| 
| 4| 40|400|  440| 
+---+---+---+--------+ 
関連する問題