2016-06-14 8 views
4

Spark Dataset(Spark 1.6.1 Version)を使用しています。以下 は私が複数の列にgroup by句を実行したかった私のコード今sparkデータセットのグループ化方法

object App { 

val conf = new SparkConf() 
.setMaster("local") 
.setAppName("SparkETL") 

val sc = new SparkContext(conf) 
sc.setLogLevel("ERROR") 
val sqlContext = new SQLContext(sc); 
import sqlContext.implicits._ 

} 

override def readDataTable(tableName:String):DataFrame={ 
val dataFrame= App.sqlContext.read.jdbc(JDBC_URL, tableName, JDBC_PROP); 
return dataFrame; 
} 


case class Student(stud_id , sname , saddress) 
case class Student(classid, stud_id, name) 


var tbl_student = JobSqlDAO.readDataTable("tbl_student").filter("stud_id = '" + studId + "'").as[Student].as("tbl_student") 

var tbl_class_student = JobSqlDAO.readDataTable("tbl_class_student").as[StudentClass].as("tbl_class_student") 


var result = tbl_class_student.joinWith(tbl_student, $"tbl_student.stud_id" === $"tbl_class_student.stud_id").as("ff") 

のですか? どうすればいいですか? result.groupBy(_._1._1.created_at)このようにすればいいですか? もしそうなら、私は複数の列でそれを行う方法によってグループとして結果を見ることができませんか?

答えて

0

あなたの要件を正しく理解している場合は、ここではreduceByKeyの機能をPairRDDFunctionsクラスに使用することをお勧めします。

機能の署名は​​であり、一連のキーと値のペアを使用することを意味します。

私はワークフローを説明してみましょう:あなたはで動作するようにMANTセットを取得

  1. を(あなたのコードで:result)RDD map機能付き
  2. あなたはタプルは2を含む結果セットを分割しました(例:result.map(row => ((row.key1, row.key2), (row.value1, row.value2))
  3. RDD [(K、V)]ここで、Kはキーフィールドタプルのタイプであり、Vはタイプです。値フィールドのタプル
  4. (: (agg: (Int, Int), val: (Int, Int)) => (agg._1 + val._1, agg._2 + val._2)例)

は注意してください:あなたは直接集計値タイプ(V,V) => Vの機能渡すことでreduceByKeyを使用することができますが、集計関数から同じ値の型を返す必要が

  • groupByの場合も同じ理由が考えられます。 e RDDをペアRDD[K,V]に開始するが、集計関数を持たない。これは、次の計算のためにseqに値を格納するだけなので、
  • 集計の開始値が必要な場合foldByKey機能
関連する問題