1
トップN値の1つを持つRDDをフィルタリングする方法は不思議です。通常、私はRDDをソートし、そのようRDDをフィルタリングするために放送することができ、N番目の値を見つけるためにドライバ内の配列としてtop
Nのアイテムを取る:スパーク - 新しいrddとしてrddのトップNを取得する方法(ドライバで収集しないで)
val topNvalues = sc.broadcast(rdd.map(_.fieldToThreshold).distict.sorted.take(N))
val threshold = topNvalues.last
val rddWithTopNValues = rdd.filter(_.fieldToThreshold >= threshold)
をが、この場合には、私のNが大きすぎますので、どのように私は、ソートされたRDDのように?:
def getExpensiveItems(itemPrices: RDD[(Int, Float)], count: Int): RDD[(Int, Float)] = {
val sortedPrices = itemPrices.sortBy(-_._2).map(_._1).distinct
// How to do this without collecting results to driver??
val highPrices = itemPrices.getTopNValuesWithoutCollect(count)
itemPrices.join(highPrices.keyBy(x => x)).map(_._2._1)
}