2017-11-29 1 views
1

トップN値の1つを持つRDDをフィルタリングする方法は不思議です。通常、私はRDDをソートし、そのようRDDをフィルタリングするために放送することができ、N番目の値を見つけるためにドライバ内の配列としてtop Nのアイテムを取る:スパーク - 新しいrddとしてrddのトップNを取得する方法(ドライバで収集しないで)

val topNvalues = sc.broadcast(rdd.map(_.fieldToThreshold).distict.sorted.take(N)) 
val threshold = topNvalues.last 
val rddWithTopNValues = rdd.filter(_.fieldToThreshold >= threshold) 

をが、この場合には、私のNが大きすぎますので、どのように私は、ソートされたRDDのように?:

def getExpensiveItems(itemPrices: RDD[(Int, Float)], count: Int): RDD[(Int, Float)] = { 
    val sortedPrices = itemPrices.sortBy(-_._2).map(_._1).distinct 

    // How to do this without collecting results to driver?? 
    val highPrices = itemPrices.getTopNValuesWithoutCollect(count) 

    itemPrices.join(highPrices.keyBy(x => x)).map(_._2._1) 
} 

答えて

2

使用zipWithIndexのようなRDDSで純粋にこれを行うにして、n個のアイテムをインデックスに巻き絞り込むことができます。

そして

rdd.zipWithIndex.filter(_._2 < 4) 
を場合を示し降順にソートこのRRDを考慮し、

val rdd = sc.parallelize((1 to 10).map(_ => math.random)).sortBy(-_) 

することがドライバにRDDを収集することなく、第1 トップ 4つの項目を提供します。

関連する問題