2016-08-09 2 views
1

2つのドキュメントのコレクションがあります。私は、k1は最初のコレクションからの文書である場合、k2は第二から1であり、cは、それらの間のコサイン類似度であるコサイン類似度を実行するときのPysparkフィルタの上位3つの一致

(k1,(k2,c)) 

をデカルト積の各ペア間のコサイン類似度を計算し、フォームのRDD

を持っています。

私は最初のコレクションの各ドキュメントk1について、2番目のコレクションと最も類似している3つのドキュメントを取得することに興味があります。私はキーでグループを演奏しました:

grouped = (pairRddWithCosine 
     .groupByKey() 
     .map(lambda (k, v): (k, sorted(v, key=lambda x: -x[1]))) 
     .map(lambda (x,y): (x, y[0][0],y[0][1], y[1][0], y[1][1],   y[2][0] , y[2][1])) 
    ) 

このグループは非常に悪いです。どうすればそれを調整することができますか、それともデータをシャッフルしないものを使用するかを教えてください。

+2

素晴らしい宿題。何を試しましたか? – eliasah

答えて

0

は、私はあなたがそれが速くgroupByKeyよりも動作しますので、ローカルが最初に削減しようとする値

k_with_top_c = rdd.reduceByKey(lambda v: sorted(v, key=lambda x: -x[1])[:3]) 

reduceByKeyの一部にのみ興味があるので、あなたがreduceByKeyを試みるべきだと思います。しかし、私はあなたがこの場合シャッフルを避けることができるとは思わない。

1

キーの値の合計/カウント/部分を取得する場合は、groupByKeyはすべてのデータをシャッフルするので、groupByKeyを回避する必要があります。特定のキーのすべての値が同じレデューサーで終わるようにします。大規模なデータセットの場合、これは非常に高価です。代わりに、reduceByKeyまたはcombineByKeyを使用する必要があります。これらの操作では、各パーティションにデータを蓄積する機能と、異なるパーティションからのアキュムレーター間のマージ機能を指定できます。あなたはより多くの詳細については、これを読むことができます。またhttps://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

0

を、私たちは参加続い

smallRdd = pairRddWithCosine.map(lambda (k1,(k2,c))) 

その後、

Combined = (smallRdd 
     .combineByKey(lambda value: [value], 
         lambda x, value: x + [value], 
         lambda x, y : max(x,y)) 
     .map(lambda (x,y): (x,y[0])) 
     .map(lambda x: (x,0)) 
     ) 

を取る場合は、最初のマッチを提供することになると思います。私たちはpairRddWithCosineからbestOlderJoin() を実行してベストマッチでない要素をすべて取得して、ベストマッチを得て2番目のベストを得ることができます。

関連する問題