カフカクラスターからのメッセージまたはデータを、指定された日に取得する方法を教えてください。例えば9月13日、誰でも私にこのコードを提供できますか?私はそれをgoogledと理論だけが見つかりましたが、私はコードが欲しいKafkaからのタイムスタンプベースのデータの取得
答えて
これのアクセス方法はありません。また、Kafka v0.10
のメッセージにはタイムスタンプ情報が含まれていないため、メッセージがトピックに書き込まれた時期を知ることはできません。
Kafka v0.10
以降、各メッセージにはメタデータのタイムスタンプ属性が含まれています。この属性は、メッセージ作成時にプロデューサによって設定されるか、メッセージ挿入時にブローカによって設定されます。時間ベースの索引が計画されていますが、まだ使用できません。したがって、トピック全体を消費し、タイムスタンプフィールドをチェックする必要があります(あなたが関心のないすべてのメッセージを無視します)。開始点を見つけるために、オフセットとタイムスタンプに関するバイナリ検索を実行して、最初のメッセージをより早く見つけることもできます。
更新:
Kakfa 0.10.1
時間ベースのインデックスを追加します。指定したタイムスタンプ以上のタイムスタンプを持つ最初のレコードにはseek
を割り当てることができます。あなたはKafkaConsumer#offsetsForTime()
でそれを使うことができます。これにより、対応するオフセットが返され、KafkaConsumer#seek()
にフィードできます。データを消費し、レコードのタイムスタンプフィールドをConsumerRecord#timestamp()
でチェックすると、いつ処理を停止できるかがわかります。
データは、オフセットによって厳密に順序付けられますが、タイムスタンプでは順序付けられません。したがって、処理中により小さいのタイムスタンプを持つ「遅い」レコードを取得する可能性があります(これらのレコードを単純にスキップすることもできます)。
検索間隔の終わりに、より困難な問題が遅く到着するレコードです。検索間隔よりも長いタイムスタンプを持つ最初のタイムスタンプを取得した後は、検索間隔の一部であるタイムスタンプ付きのレコードが残っている可能性があります(これらのレコードがトピック「late」に追加された場合)。しかしそれを知る方法はありません。したがって、「もう少し」のレコードを読み続け、「遅い」レコードがあるかどうかを確認することができます。 「いくつかのレコード」が意味することは、自分で作る必要のある意思決定です。
「書き込みパターン」に関する追加知識がある場合、検索間隔「終了」後に消費するレコードの数を決定するのに役立ちます。もちろん、2つのデフォルト戦略があります:(1)検索間隔よりも長いタイムスタンプを持つ最初のレコードで停止します(そして遅れて到着したレコードを効果的に無視します - 「ログ追加時間」設定を使用する場合はもちろん安全な戦略です) ); (2)ログの最後まで読んでください - これは完全性に関して最も安全な戦略ですが、余計なオーバーヘッドが発生する可能性があります(レコードはいつでも追加でき、レコード "遅延"ログの終わりに達した後に遅れたレコードが追加されることもあります)。
実際には、「最大期待遅延」について考え、この上限遅延よりも大きなタイムスタンプを持つレコードを取得するまで読むことをお勧めします。 「カフカのようそれはメッセージの挿入時にメッセージ作成時に、またはブローカーでプロデューサーで設定されるか、各メッセージは、メタデータのタイムスタンプ属性が含まれていますv0.10。」
- 1. kafka現在の時刻からデータを取得します
- 2. LiteDBからのデータ取得
- 3. nodejsのmongodbからのデータの取得
- 4. FireBaseからのデータの取得SWIFT
- 5. Swift 3.0のFirebaseからのデータ取得
- 6. Chromeからのデータの取得localStorage
- 7. React-Redux - Firebaseからのデータの取得
- 8. 抽象クラスからのデータの取得
- 9. wtformsからのデータの取得
- 10. ブラウザからJavaScriptへのデータの取得
- 11. ETSテーブルからのデータの取得
- 12. JSONからのデータの取得
- 13. jdbcからのデータの取得
- 14. データベースからJavaへのデータの取得
- 15. SQLからPHPへのデータの取得
- 16. タイムラインのデータベースからのデータ取得
- 17. メッセージキューからのデータの取得
- 18. リピータからのデータの取得
- 19. サービスからアクティビティへのデータの取得
- 20. マルチレベルアレイからのデータの取得
- 21. Inappbrowserからのデータの取得
- 22. mySQLクエリからのデータの取得
- 23. モデルからコントローラへのデータの取得
- 24. Firebaseからテーブルビューへのデータの取得
- 25. 外部プログラムからのデータの取得
- 26. Spark Structured Streamは、Kafkaの1つのパーティションからのメッセージを取得します
- 27. サーバーからのJSONデータの取得方法と取得方法
- 28. Spark StreamingからKafkaにデータをプッシュ
- 29. Kafkaアプリケーションから監視メトリックを取得する
- 30. kafka(0.10.x)からグループコミットオフセットを取得する方法
* * あなたはどのように説明することができますタイムスタンプはプロデューサによって定義されていますか?サーバーにタイムスタンプを提供させる方法はありますか? –
'ProducerRecord'にはそのコンストラクタの多重オーバーロードがあります。タイムスタンプパラメータ(long型)を受け付けるものもあります。ブローカ側のタイムスタンプについては、対応するトピック設定 'message.timestamp.type' cf https://kafka.apache.org/documentation/#topicconfigsを変更する必要があります –