2016-11-06 4 views
1

スパークバージョン2.0.1とスカラバージョン2.11.8でスパークシェルを実行しています。spark `reduceGroups`エラー代替方法によるオーバーロードされたメソッド

次のコードは、チェックを入力して失敗します。

val is = sc.parallelize(0 until 100) 
val ds = is.map{i => (s"${i%10}", i)} 
val gs = ds.groupByKey(r => r._1) 
gs.reduceGroups((v: ((String, Int), (String, Int))) => (v._1._1, v._1._2 + v._2._2)) 

エラーメッセージが

<console>:32: error: overloaded method value reduceGroups with alternatives: 
    (f: org.apache.spark.api.java.function.ReduceFunction[(String, Int)])org.apache.spark.sql.Dataset[(String, (String, Int))] <and> 
    (f: ((String, Int), (String, Int)) => (String, Int))org.apache.spark.sql.Dataset[(String, (String, Int))] 
cannot be applied to ((((String, Int), (String, Int))) => (String, Int)) 
     gs.reduceGroups((r : ((String, Int), (String, Int))) => (r._1._1, r._1._2 + r._2._2)) 

私の知る限り、私はreduceGroupsに渡すラムダが正確で必要な署名と一致しています2番目の選択肢。

答えて

2

reduceGroupsは、2つの引数をとる関数を想定していますが、渡す関数は1つの引数の関数です。比較署名はあなたが合格:ながら期待

((V, V)) ⇒ V 

です:V(String, Int)ある

(V, V) ⇒ V 

を使用でき

gs.reduceGroups(
    (v1: (String, Int), v2: (String, Int)) => (v1._1, v1._2 + v2._2) 
) 

キーと重複しないより簡潔な解決策、:

spark.range(0, 100) 
    .groupByKey(i => s"${i % 10}") 
    .reduceGroups(_ + _) 
関連する問題