2016-08-22 7 views
0
val temp = sqlContext.sql(s"SELECT A, B, C, (CASE WHEN (D) in (1,2,3) THEN ((E)+0.000)/60 ELSE 0 END) AS Z from TEST.TEST_TABLE") 
val temp1 = temp.map({ temp => ((temp.getShort(0), temp.getString(1)), (USAGE_TEMP.getDouble(2), USAGE_TEMP.getDouble(3)))}) 
.reduceByKey((x, y) => ((x._1+y._1),(x._2+y._2))) 

ハイブレイヤーの計算(ケース評価)を行っている上記のコードの代わりに、私はスカラで変換を行いたいと思います。どうすればいい?ハイブSQLクエリからSparkへの変換を移動する

マップ内のデータを埋めて同じことを実行できますか?

+0

の呼出Scalaの機能ロジックは 'withColumn'method離れ以下sarveshによって示唆' map'方法から別のアプローチであるなら –

答えて

1
val temp = sqlContext.sql(s"SELECT A, B, C, D, E from TEST.TEST_TABLE") 

val tempTransform = temp.map(row => { 
    val z = List[Double](1, 2, 3).contains(row.getDouble(3)) match { 
    case true => row.getDouble(4)/60 
    case _ => 0 
    } 
    Row(row.getShort(0), Row.getString(1), Row.getDouble(2), z) 
}) 

val temp1 = tempTransform.map({ temp => ((temp.getShort(0), temp.getString(1)), (USAGE_TEMP.getDouble(2), USAGE_TEMP.getDouble(3)))}) 
    .reduceByKey((x, y) => ((x._1+y._1),(x._2+y._2))) 
1

あなたのようなデータフレームもそのSQLを実行し、同様

new_df = old_df.withColumn('target_column', udf(df.name)) 

あなたのケースでは、このexample

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._ // for `toDF` and $"" 
import org.apache.spark.sql.functions._ // for `when` 

val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5))) 
    .toDF("A", "B", "C") 

val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1)) 

によってrefferedとしてこの構文を使用することができ、以下の val temp = sqlContext.sql(s"SELECT A, B, C, D, E from TEST.TEST_TABLE")

を適用します3210ケース又はwhenotherwise伴うまたは必要スパークudf

は、代わりhiveudf

関連する問題