2017-08-21 4 views
2

異なる果物のソースがあり、それらのカウントをデータベースに挿入したいとします。Scala/Akkaストリームの要素のグループ化

私はこのような何か行うことができます。

Flow[Fruits] 
.map { item => 
    insertItemToDatabase(item) 
} 

をしかし、それは遅い明らかである - なぜグループ際に私ができる、すべての項目でデータベースにそれらを挿入しますか?

Flow[Fruits] 
.grouped(10000) 
.map { items => 
    insertItemsToDatabase(items) 
} 

しかし、それは私が彼らがデータベースにフラッシュされるまで、メモリ内に10個の000要素 [banana, orange, orange, orange, banana, ...]を保持する必要があることを意味します。だから私は、よりよい解決策を考え出しました。これは効率的ではないですか?おそらく、私はこのような何か行うことができます。私の理解から

Flow[Fruits] 
.grouped(100) 
.map { items => 
    consolidate(items) // this will return Map[String, Int] 
} 
.grouped(100) 
// here I have Seq[Map[String, Int]] 
.map { mapOfItems=> 
    insertMapToDatabase(mapOfItems) 
} 

を、これも一度に10個の000要素を処理する必要がありますが、(要素が頻繁に繰り返され、提供する)など多くのメモリを取るべきではありません。しかし、各キーはまだメモリに100回繰り返されます。確かに私はすることができます.grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map() ...しかし、良い方法はありませんか?おそらく、このような何か:

Flow[Fruits] 
.map { item => 
    addToMap(item) 
    if(myMap.length == 10000) { 
     insertToDatabase(myMap) 
     clearMyMap() 
    } 
} 

しかし、それはアッカの概念を破壊しないが、処理段階の、すなわち独立性(したがって、同時実行)をストリーム?

+0

"groupedWithin'関数を見てください。要素の最大範囲と時間率の2つのパラメータが必要です。たとえば、 '' .groupedWithnin(5000,1.seconds) ''は、1秒前に到達した場合に5000個の要素を処理するか、1秒間に累積された要素の数を返します。 – alifirat

+0

あなたの提案に感謝@alifirat、それはグループ化する方法とはまったく異なります。私が必要とするのは、私が持っているデータを処理する別の方法です。 –

答えて

1

Fruitセットのカーディナリティが低い場合は、すべてのカウントを含む単一のマップを保持し、フルーツ値をすべてストリーミングした後にデータベースにフラッシュすることができます。

まず、ランニングカウントし続けますフロー構築:

type Count = Int 

type FruitCount = Map[Fruit, Count] 

val zeroCount : FruitCount = 
    Map.empty[Fruit, Count] withDefaultValue 0 

val appendFruitToCount : (FruitCount, Fruit) => FruitCount = 
    (fruitCount, fruit) => fruitCount + (fruit -> fruitCount(fruit) + 1) 

val fruitCountFlow : Flow[Fruit, FruitCount, NotUsed] = 
    Flow[Fruit].scan(zeroCount)(appendFruitToCount) 

今すぐストリームを最後FruitCountを受け取り、実体化されますシンクを作成する:

val lastFruitCountSink : Sink[FruitCount, _] = Sink.lastOption[FruitCount] 

val fruitSource : Source[Fruit, NotUsed] = ??? 

val lastFruitCountFut : Future[Option[FruitCount]] = 
    fruitSource 
    .via(fruitCountFlow) 
    .to(lastFruitCountSink) 
    .run() 

lastFruitCountFutはその後することができ値をデータベースに送信するために使用されます。

lastFruitCountFut foreach (_ foreach (_ foreach { (fruit, count) => 
    insertItemsToDatabase(Iterator.fill(count)(fruit)) 
})) 

Iteratorは、フルーツアイテムのTraversableOnceを構築するのに最もメモリ効率の良いコレクションであるため、使用されています。

この解決策では、1つのキーごとに1つの異なるフルーツタイプ& 1つのキーに対して1つのキーを持つメモリに1 Mapを保存します。

関連する問題