0
カフカストリームを使用して、サイズの異なる時間ウィンドウを使用して大量のデータを集約しようとしています。カフカストリームを使用して大量のデータを集める
キャッシュサイズを2 GBに増やしましたが、ウィンドウサイズを1時間に設定すると、CPU負荷が100%になりアプリケーションが減速し始めます。
私のコードは次のようになります。
val tradeStream = builder.stream<String, Trade>(configuration.topicNamePattern, Consumed.with(Serdes.String(), JsonSerde(Trade::class.java)))
tradeStream
.groupBy(
{ _, trade -> trade.pair },
Serialized.with(JsonSerde(TokensPair::class.java), JsonSerde(Trade::class.java))
)
.windowedBy(TimeWindows.of(windowDuration).advanceBy(windowHop).until(windowDuration))
.aggregate(
{ Ticker(windowDuration) },
{ _, newValue, aggregate -> aggregate.add(newValue) },
Materialized.`as`<TokensPair, Ticker>(storeByPairs)
.withKeySerde(JsonSerde(TokensPair::class.java))
.withValueSerde(JsonSerde(Ticker::class.java))
)
.toStream()
.filter { tokensPair, _ -> filterFinishedWindow(tokensPair.window(), windowHop) }
.map { tokensPair, ticker -> KeyValue(
TickerKey(ticker.tokensPair!!, windowDuration, Timestamp(tokensPair.window().start())),
ticker.calcPrice()
)}
.to(topicName, Produced.with(JsonSerde(TickerKey::class.java), JsonSerde(Ticker::class.java)))
また、彼らは順番に、ウィンドウの終了時間によってフィルタリングされカフカのトピックに集約されたデータを送信する前に終了したばかりのウィンドウトピックに送信します。
おそらく、この種の集約を実装するためのいくつかのより良いアプローチがありますか?