2016-08-20 4 views
0

私の質問は、PySpark reduceByKey on multiple valuesに似ていますが、何らかの形で重要な違いがあります。私はPySparkを初めて使っているので、明らかに何かを見逃しています。ネストされたタプルのPyspark reduceByKey

私は次のような構造でRDDを持っている:私が欲しいもの

(K0, ((k01,v01), (k02,v02), ...)) 
.... 
(Kn, ((kn1,vn1), (kn2,vn2), ...)) 

出力は

(K0, v01+v02+...) 
... 
(Kn, vn1+vn2+...) 

のようなものであるように、これはreduceByKeyを使用するのに最適なケースのように思えるし、最初に思いました何かのようなもの

rdd.reduceByKey(lambda x,y: x[1]+y[1]) 

これは、私が始めたRDDと。入れ子にされたタプルがあるため、インデックス作成に問題があると思われますが、可能なすべてのインデックスの組み合わせを試しました。最初のRDDを元に戻し続けます。

ネストされたタプルでは機能しない理由があるのでしょうか、何か間違っていますか?

答えて

0

ここではreduceByKeyを使用しないでください。それは、署名付きの連想的かつ可換の関数をとる。 (T, T) => T。入力としてList[Tuple[U, T]]があり、出力としてTが必要な場合は適用できないことは明らかです。

キーまたはユニークであるかどうかは、ローカルとグローバルの両方で集計する必要がある場合の一般的な例を考慮することはできません。キーはすでに簡単なmapValuesは十分です一意である場合

from functools import reduce 
from operator import add 

def agg_(xs): 
    # For numeric values sum would be more idiomatic 
    # but lets make it more generic 
    return reduce(add, (x[1] for x in xs), zero_value) 

zero_value = 0 
merge_op = add 
def seq_op(acc, xs): 
    return acc + agg_(xs) 

rdd = sc.parallelize([ 
    ("K0", (("k01", 3), ("k02", 2))), 
    ("K0", (("k03", 5), ("k04", 6))), 
    ("K1", (("k11", 0), ("k12", -1)))]) 

rdd.aggregateByKey(0, seq_op, merge_op).take(2) 
## [('K0', 16), ('K1', -1)] 

from itertools import chain 

unique_keys = rdd.groupByKey().mapValues(lambda x: tuple(chain(*x))) 
unique_keys.mapValues(agg_).take(2) 
## [('K0', 16), ('K1', -1)] 
+0

今の私には明らかであることv01v02は、... vmは、単純な数値であると仮定します。はい、キーはユニークなので、mapValuesのアプローチは必要なものです。どうもありがとうございました。 –

関連する問題