2016-10-21 6 views
0

は、私が持っていると言う、次の例のようにRDD [(文字列、INT)]:私は効率的に0、1を含むレコードの合計額を印刷したいスパークフィルタ、カウント大きなRDD複数回

(A, 0) 
(B, 0) 
(C, 1) 
(D, 0) 
(E, 2) 
(F, 1) 
(G, 1) 
(H, 3) 
(I, 2) 
(J, 0) 
(K, 3) 

、2など RDDには何百万ものエントリが含まれているので、できるだけ効率的にこれを実行したいと思います。

のようなものが返されます。この例の出力:

Number of records containing 0 = 4 
Number of records containing 1 = 3 
Number of records containing 2 = 2 
Number of records containing 3 = 2 

は現在、私は別に...、0、1、2、次いでcount()大きなRDDにフィルタを実行することで、これを実装してみてくださいを。私はScalaを使用しています。

これを行うより効率的な方法はありますか?私はすでにRDDをキャッシュしていますが、私のプログラムではメモリが不足しています(私はドライバメモリを5Gに設定しています)。

EDIT:

rdd.map(_.swap).countByKey() 

私は2番目の要素が「M」または「Fのいずれかであるタプル(に文字列値を変更することによって、これを絞り込むことができ:countByKeyを使用Tzachによって示唆されるよう")、このタプルの2番目の要素の一意の値ごとにキーごとのカウントを取得しますか?例えば

(A,m), 0) 
(B,f), 0) 
(C,m), 1) 
(D,m), 0) 
(E,f), 2) 
(F,f), 1) 
(G,m), 1) 
(H,m), 3) 
(I,f), 2) 
(J,f), 0) 
(K,m), 3) 

は事前に

((0,m), 2) 
((0,f), 2) 
((1,m), 2) 
((1,f), 1) 
((2,m), 0) 
((2,f), 2) 
((3,m), 2) 
((3,f), 0) 

おかげでしまいます!

答えて

2

あなたはそのための便利なcountByKeyを使用することができます - ちょうどキー数値を作るために、あらかじめ入力して場所を入れ替える:

val rdd = sc.parallelize(Seq(
    ("A", 0), ("B", 0), ("C", 1), ("D", 0), ("E", 2), 
    ("F", 1), ("G", 1), ("H", 3), ("I", 2), ("J", 0), ("K", 3) 
)) 

rdd.map(_.swap).countByKey().foreach(println) 
// (0,4) 
// (1,3) 
// (3,2) 
// (2,2) 

EDITcountByKeyはそれはのように聞こえるまさにん - そうあなたが使用したいものは何でもキー、単にタプルの左側一部としてあることを持っているあなたのRDDを変換する、例えば:

rdd.map { case ((a, b), i) => ((i, b), a) }.countByKey() 

か:

rdd.keyBy { case ((_, b), i) => (i, b) }.countByKey() 
+0

ありがとう。問題のもう少し洗練された答えを編集しました。 – Laurens

+1

これに応じて編集された回答(フォローアップの質問はSOのエチケットと本当にインラインではありません - 将来の読者が従うのが難しく、応答者に不公平になることもあります) –

関連する問題