2017-09-15 1 views
0

に基づいて平均値計算私は2つのデータフレームを持っている:分割スパークデータフレームおよび1つの列値

Class, Calculation 
first, Average 
Second, Sum 
Third, Average 

セカンドデータフレームstudentRecordは、以下のように周り50Kエントリ有する:

まずデータフレーム classRecordは、次のような10個の異なるエントリを有します
Name, height, Camp, Class 
Shae, 152, yellow, first 
Joe, 140, yellow, first 
Mike, 149, white, first 
Anne, 142, red, first 
Tim, 154, red, Second 
Jake, 153, white, Second 
Sherley, 153, white, Second 

2番目のデータフレームから、クラスの型に基づいて、キャンプに基づいて高さ(最初のクラスは平均、2番目のクラスは合計:等)を計算したいと思います(クラスがfir黄色、白、その他の別々の平均)。ここで

//function to calculate average 
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame) : Array[(String, Double)] = { 
    val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd 
    var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1/y._2).collect 
    return avg_by_key 
} 

//required schema for further modifications 
val schema = StructType(
StructField("name", StringType, false) :: 
StructField("avg", DoubleType, false) :: Nil) 

// for each loop on each class type 
classRecord.rdd.foreach{ 
    //filter students based on camps 
    var campYellow =studentRecord.filter($"Camp" === "yellow") 
    var campWhite =studentRecord.filter($"Camp" === "white") 
    var campRed =studentRecord.filter($"Camp" === "red") 

    // since I know that calculation for first class is average, so representing calculation only for class first 
    val avgcampYellow = averageOnName(campYellow) 
    val avgcampWhite = averageOnName(campWhite) 
    val avgcampRed = averageOnName(campRed) 

    // union of all 
    val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfYellow = sqlContext.createDataFrame(rddYellow, schema) 
    //union with yellow camp data 
    val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfWhite = sqlContext.createDataFrame(rddWhite, schema) 
    var dfYellWhite = dfYellow.union(dfWhite) 
    //union with yellow,white camp data 
    val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue())) 
    //conversion of rdd to frame 
    var dfRed = sqlContext.createDataFrame(rddRed, schema) 
    var dfYellWhiteRed = dfYellWhite .union(dfRed) 
    // other modifications and final result to hive 
} 

私は苦労しています:

1.hardcoding Yellow, red and white, there may be other camp type also. 
2. Filtering same dataframe many times 
3. Not able to figure out how to calculate differently according to class calculation type. 

ヘルプは高く評価され 私は、次の試してみました。ありがとう。

+0

私が正しく理解していれば、キャンプとクラスの両方に応じて平均値または合計の高さが求められますか?どのようなキャンプ/クラスのすべての組み合わせの両方の計算については、それをデータフレームに入れ、別に 'classRecord'dfの読書をしますか? – Shaido

答えて

0

クラス/キャンプのすべての組み合わせの平均と合計を計算してから、classRecordデータフレームを別々に解析して、必要なものを抽出することができます。これは、groupBy()メソッドを使用してスパークして簡単に行うことができ、値を集約します。あなたの例のデータフレームを使用して

:これを実行した後

val spark = SparkSession.builder.getOrCreate() 
import spark.implicits._ 

studentRecord.show() 

+-------+------+------+------+ 
| Name|height| Camp| Class| 
+-------+------+------+------+ 
| Shae| 152|yellow| first| 
| Joe| 140|yellow| first| 
| Mike| 149| white| first| 
| Anne| 142| red| first| 
| Tim| 154| red|Second| 
| Jake| 153| white|Second| 
|Sherley| 153| white|Second| 
+-------+------+------+------+ 

val df = studentRecord.groupBy("Class", "Camp").agg(
    sum($"height").as("Sum"), 
    avg($"height").as("Average"), 
    collect_list($"Name").as("Names")) 
df.show() 

+------+------+---+-------+---------------+ 
| Class| Camp|Sum|Average|   Names| 
+------+------+---+-------+---------------+ 
| first| white|149| 149.0|   [Mike]| 
| first| red|142| 142.0|   [Anne]| 
|Second| red|154| 154.0|   [Tim]| 
|Second| white|306| 153.0|[Jake, Sherley]| 
| first|yellow|292| 146.0| [Shae, Joe]| 
+------+------+---+-------+---------------+ 

は、あなたは単にあなたが必要がある行の後にあなたの最初のclassRecordデータフレームを確認することができます。どのように見えるかの例は、実際のニーズの後に変更できます。

// Collects the dataframe as an Array[(String, String)] 
val classRecs = classRecord.collect().map{case Row(clas: String, calc: String) => (clas, calc)} 

for (classRec <- classRecs){ 
    val clas = classRec._1 
    val calc = classRec._2 

    // Matches which calculation you want to do 
    val df2 = calc match { 
    case "Average" => df.filter($"Class" === clas).select("Class", "Camp", "Average") 
    case "Sum" => df.filter($"Class" === clas).select("Class", "Camp", "Sum") 
    } 

// Do something with df2 
} 

希望すると助かります。

+0

部分的にこれと一緒に、私はすべての名前が "クラス、キャンプ、名前、平均"のようなものにも属している必要があります。最終的なDFを取得したとしても。どのように私はクラスのために最初に私は平均を選択する必要があることを決める(捨てざる)、私は合計(捨てavg)などが必要です。 – Swati

+0

私は上記の解決策を試してみましたが、エラーを示しています:値groupbyはorg.apache.spark.rdd.RDD [String]のメンバーではありません。ありがとう。 – Swati

+0

@スワティ申し訳ありませんが、大文字の「B」の 'groupBy()'があったはずです。ソリューションの名前の一覧も追加しました。 – Shaido

関連する問題