異なる果物のソースがあり、それらのカウントをデータベースに挿入したいとします。Scala/Akkaストリームの要素のグループ化
私はこのような何か行うことができます。
Flow[Fruits]
.map { item =>
insertItemToDatabase(item)
}
をしかし、それは遅い明らかである - なぜグループ際に私ができる、すべての項目でデータベースにそれらを挿入しますか?
Flow[Fruits]
.grouped(10000)
.map { items =>
insertItemsToDatabase(items)
}
しかし、それは私が彼らがデータベースにフラッシュされるまで、メモリ内に10個の000要素
[banana, orange, orange, orange, banana, ...]
を保持する必要があることを意味します。だから私は、よりよい解決策を考え出しました。これは効率的ではないですか?おそらく、私はこのような何か行うことができます。私の理解から
Flow[Fruits]
.grouped(100)
.map { items =>
consolidate(items) // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
insertMapToDatabase(mapOfItems)
}
を、これも一度に10個の000要素を処理する必要がありますが、(要素が頻繁に繰り返され、提供する)など多くのメモリを取るべきではありません。しかし、各キーはまだメモリに100回繰り返されます。確かに私はすることができます.grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()
...しかし、良い方法はありませんか?おそらく、このような何か:
Flow[Fruits]
.map { item =>
addToMap(item)
if(myMap.length == 10000) {
insertToDatabase(myMap)
clearMyMap()
}
}
しかし、それはアッカの概念を破壊しないが、処理段階の、すなわち独立性(したがって、同時実行)をストリーム?
"groupedWithin'関数を見てください。要素の最大範囲と時間率の2つのパラメータが必要です。たとえば、 '' .groupedWithnin(5000,1.seconds) ''は、1秒前に到達した場合に5000個の要素を処理するか、1秒間に累積された要素の数を返します。 – alifirat
あなたの提案に感謝@alifirat、それはグループ化する方法とはまったく異なります。私が必要とするのは、私が持っているデータを処理する別の方法です。 –