2017-12-08 3 views
0

カフカストリームを使用して、サイズの異なる時間ウィンドウを使用して大量のデータを集約しようとしています。カフカストリームを使用して大量のデータを集める

キャッシュサイズを2 GBに増やしましたが、ウィンドウサイズを1時間に設定すると、CPU負荷が100%になりアプリケーションが減速し始めます。

私のコードは次のようになります。

val tradeStream = builder.stream<String, Trade>(configuration.topicNamePattern, Consumed.with(Serdes.String(), JsonSerde(Trade::class.java))) 

tradeStream 
    .groupBy(
      { _, trade -> trade.pair }, 
      Serialized.with(JsonSerde(TokensPair::class.java), JsonSerde(Trade::class.java)) 
    ) 
    .windowedBy(TimeWindows.of(windowDuration).advanceBy(windowHop).until(windowDuration)) 
    .aggregate(
     { Ticker(windowDuration) }, 
     { _, newValue, aggregate -> aggregate.add(newValue) }, 
     Materialized.`as`<TokensPair, Ticker>(storeByPairs) 
       .withKeySerde(JsonSerde(TokensPair::class.java)) 
       .withValueSerde(JsonSerde(Ticker::class.java)) 
    ) 
    .toStream() 
    .filter { tokensPair, _ -> filterFinishedWindow(tokensPair.window(), windowHop) } 
    .map { tokensPair, ticker -> KeyValue(
      TickerKey(ticker.tokensPair!!, windowDuration, Timestamp(tokensPair.window().start())), 
      ticker.calcPrice() 
    )} 
    .to(topicName, Produced.with(JsonSerde(TickerKey::class.java), JsonSerde(Ticker::class.java))) 

また、彼らは順番に、ウィンドウの終了時間によってフィルタリングされカフカのトピックに集約されたデータを送信する前に終了したばかりのウィンドウトピックに送信します。

おそらく、この種の集約を実装するためのいくつかのより良いアプローチがありますか?

答えて

0

システムをもう少し知ることで、診断が難しくなります。

クラスタにはいくつのパーティションがありますか? 実行しているストリームアプリケーションの数はいくつですか? ストリームアプリケーションは同じマシンで実行されていますか? ペイロードに圧縮を使用していますか? 小さな間隔でも機能しますか?

希望に役立ちます。

関連する問題