2017-01-02 54 views

答えて

0

私はあなたの質問を正しく理解しているかどうかわかりません。しかし、私は複数のアプローチがあると思います(あなたが実際に何を達成したいのかは、質問からは不明確です)。

  1. 使用カフカストリームDSL(カフカ0.10):あなたの場合:任意のサイズ
  2. のタンブリングウィンドウは、タイムスタンプ(カフカ0.10)エクスプロイトとして使用すると、ウィンドウの集約を指定することができますカフカストリーム(Javaストリーム処理ライブラリ)を使用して、 KafkaConsumerを使用したい場合は、メッセージを読んで、タイムスタンプをチェックして、間隔でデータをチャンクすることができます。
  3. システム時間ベース(すべてのカフカバージョン):カフカからのメッセージを読み込み、つまり、次のレコードを処理する前に、ローカルクロックを調べて間隔を空けてメッセージを配置します。
+0

私は、カフカから1分ごとにデータを取得するオプションがあることを示しています(例:10:01に10:00から10:01までのすべてのレコードを読む、10:02ですべて読む実行時に新しいレコードを取得するのではなく、10時01分から10時02分までのレコードなど) 処理が完了するまでメモリ内のデータを保持するのではなく、指定された間隔ごとに処理するためにデータを読み取る必要があります。 – user7365161

+0

Kafkaはプルベースなので、これは組み込みのサポートはありません。提案されたアプローチの1つを使用して、このロジックをクライアントに配置する必要があります。あなたが正しくコメントしていると分かっている場合は、poll()を開始する前に現在のend-of-logオフセットを取得するために、approach(3)を組み合わせて使用​​し、得られたオフセットにコンシューマメッセージのみを追加するあなたが消費を開始した後) –

関連する問題