2016-12-21 4 views
2

私はSpark 1.6.1を使用しています。同じ行の他の列に基づいて列の内容を動的に選択

categoryNameがString型であり、 cat*はダブルです
+------------+-----+----+ 
|categoryName|catA |catB| 
+------------+-----+----+ 
|  catA |0.25 |0.75| 
|  catB |0.5 |0.5 | 
+------------+-----+----+ 

:私のデータフレームがどのように見えると言うことができます。私はいくつかの後の計算に、このような抽出を必要とする

+------------+-----+----+-------+ 
|categoryName|catA |catB| score | 
+------------+-----+----+-------+ 
|  catA |0.25 |0.75| 0.25 | ('score' has value from column name 'catA') 
|  catB |0.5 |0.7 | 0.7 | ('score' value from column name 'catB') 
+------------+-----+----+-------+ 

:私はcategoryName列にある名前列の値が含まれます列を追加したいと思います。何か案は?

重要:カテゴリの列の名前はわかりません。ソリューションは動的である必要があります。

答えて

3

スパーク2.0: あなたが作成することにより、(カテゴリ任意の数の列のために)これを行うことができますcategroyName - > categoryValueのマップを保持し、それから選択する一時的な列:

// sequence of any number of category columns 
val catCols = input.columns.filterNot(_ == "categoryName") 

// create a map of category -> value, and then select from that map using categoryName: 
input 
    .withColumn("asMap", map(catCols.flatMap(c => Seq(lit(c), col(c))): _*)) 
    .withColumn("score", $"asMap".apply($"categoryName")) 
    .drop("asMap") 

スパーク1.6:同様のアイデアが、それから選択するために、アレイとUDFを使用して:

// sequence of any number of category columns 
val catCols = input.columns.filterNot(_ == "categoryName") 

// UDF to select from array by index of colName in catCols 
val getByColName = udf[Double, String, mutable.WrappedArray[Double]] { 
    case (colName, colValues) => 
    val index = catCols.zipWithIndex.find(_._1 == colName).map(_._2) 
    index.map(colValues.apply).getOrElse(0.0) 
} 

// create an array of category values and select from it using UDF: 
input 
    .withColumn("asArray", array(catCols.map(col): _*)) 
    .withColumn("score", getByColName($"categoryName", $"asArray")) 
    .drop("asArray") 
1

あなたは、いくつかのオプションがあります:あなたは、あなたが単に計算を行うマップを作成し、その場合にはデータセットのAPIを使用することができスカラ座を使用している場合

  1. を。
  2. あなたは、あなたが/それ以外の句が何をしたときの束を使用することができ、入力として、関連するすべての列を受け取り、UDFを作成し、
  3. 内の計算を行うことができます
  4. をデータフレームからRDDに移動し、マップを使用することができます検索(。例えば(COL1 == CATA、COL(CATA))そうでない場合(COL(CATB)))
+0

1)データセットAPIは右、スパーク2.0.0からでしょうか?私は1.6.1を使用しています 2)うーん、おそらく、私はそれをチェックします。 3.)しかし、私はudfの中で列名のコンテキストを緩くします、そうですか? 4.)動的ではありません –

+0

spark 1.6.1はscala(2.0で変更)のデータセットAPIを持っています。 UDFに移動するときは、順序を設定して列名を使用できます。 –

関連する問題