2016-12-14 4 views
0

複数のファイルを読み込み、繰り返し行を数え、繰り返し数で行を並べ替え、上位10行を繰り返します。複数のソート済みパーティションから最初のn個の要素を取得する

lines = env.readTextFile("logs-dir") 
tuples = lines.map(line -> Tuple2(line, 1)) 
aggregate = tuples.groupBy(0).sum(1) 
sort = aggregate.sortPartition(1, Order.DESCENDING) 
sorted.first(10).writeAsText("domains") 

問題は、first-nが任意であり、すべてのパーティションからランダムな10番目の要素を返すことです。

並列性を1に減らさずに、すべてのパーティションからソートされたfirst-n要素を選択する方法はありますか?

答えて

1

私はこの問題を、各パーティションの最初の10個の要素を返す並列MapPartitionFunctionで解決し、その結果を単一のパーティションに送り、ソートして最初の10個をもう一度取ります。これは、次のようになります。

lines = env.readTextFile("logs-dir") 
tuples = lines.map(line -> Tuple2(line, 1)) 
aggregate = tuples.groupBy(0).sum(1) 

// sort partitions in parallel 
sortPart = aggregate.sortPartition(1, Order.DESCENDING) 
// take first 10 of each partition 
firstPart = sortPart.mapPartition(new First(10)) 

// sort all in one partition 
sortFull = firstPart.sortPartition(1, Order.DESCENDING).parallelism(1) 
// take first 10 
first10 = sortFull.mapPartition(new First(10)) 
first10.writeAsText("domains") 

MapPartitionFunctionFirstは非常に簡単になります。カウンタが0に下がったときに、mapPartition()関数から転送するレコード数をカウントダウンします。

関連する問題