2016-05-15 7 views
0

私は、データはこれに似て設定している場合:ペアRDD変換

val list = List ((1,1), (1,2), (1,3), (2,2), (2,1), (3,1), (3,3))

と出力があるべきように、Iごとのキーの平均を見つけたい:

(1, 2), (2, 3/2), (3, 2)私はこれを行うことができますgroupByKey, countByKey, and reduceByKeyをどういうふうに使うか、以下のようなcombineByKeyメソッドを使う必要があります:groupByKey, countByKey, and reduceByKeyを使ってみましたが、この組み合わせのメソッドはうまくいきません。これらの3つのメソッドを使って行う方法が分かっているのでしょうか?

val result = input.combineByKey(
(v) => (v, 1), 
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), 
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)). 
map{ case (key, value) => (key, value._1/value._2.toFloat) } 

result.collectAsMap().map(println(_)) 

答えて

4

あなたは次のことを試してみてください:

val sc: SparkContext = ... 
val input = sc.parallelize(List((1,1), (1,2), (1,3), (2,2), (2,1), (3,1), (3,3))) 
val averages = input.groupByKey.map { case (key, values) => 
    (key, values.sum/values.size.toDouble) 
} 

println(averages.collect().toList) // List((1,2.0), (2,1.5), (3,2.0)) 
+0

ああ右その配列に!お手伝いありがとう! – CapturedTree

1

さてあなたは、単にPairRDDFunctions.groupByKeyを使用して、必要なものを計算することができます。

val avgKey = input.groupByKey.map{ 
    case (k, v) => (k, v.sum.toDouble/v.size) 
} 
avgkey.collect 
//res2: Array[(Int, Double)] = Array((3,2.0), (1,2.0), (2,1.5)) 
1

トリプレットduplesの形質転換前に、reduceByKeyを使用して

rdd.map{ case(k,v) => (k,(v,1)) }. 
    reduceByKey((a,v) => (a._1+v._1, a._2+v._2)). 
    map {case (k,v) => (k, v._1/v._2)} 
私はあなたがサムとサイズを使用することができますので、あなたはそれが配列の値を置くgroupByKey際に忘れてしまった
+0

こんにちはエルム!遅く返事を申し訳ありませんが、マップ関数の中で 'case'を使用するときに説明してください。 'k'と' v'がどのように構造化されているかを具体的に指定するのではなく、 '(k、v)'としてパラメータを書くことができますか?例えば、 'k'がタプルの場合、マップに大文字小文字を入れずに'((a1、a2)、v) 'と書く必要がありますか?だから、技術的にはパターンマッチングだけです。 – CapturedTree

+0

'case'では、データ構造の抽出や分解のためのパターンマッチングを有効にし、中括弧を使って部分関数を定義します(すべてのパターンを定義する必要はありません)。一方、タプルデータ構造の使用には、データ項目を取得(抽出)するために独自のメソッド(._1および._2)を使用する必要があります。 – elm

関連する問題