2017-11-06 1 views
2

現在、私は、カフカストリームでレコードを分類するための、直接的かつ効率的な方法を探しています。
すべてのレコードには、少なくともidfailedのプロパティが含まれています。
idは単なる文字列であるとfailedはブールです)カフカストリームによる単純分類

アイデアは、初めに、「メッセージ」として、すべての着信記録を分類することです。
受信したレコードの1つに失敗したフィールドが設定されている場合、これはどこかに「持続」され、レコードは「失敗」として分類されます。

idの受信レコードは、failedプロパティが設定されていても、「失敗」として分類される必要があります。

私は、カフカストリームの内部状態ストア(対話型クエリ機能と一緒に)を使用するか、レコードが入るたびにクエリされる外部データベースを使用することを考えています。カフカの州立記憶は、より軽量なソリューションのように。

問題を理解するのに役立つ小さなコンセプトスケッチがあります。 enter image description here

誰かがこれを正しい方法に対処する方法について考えていますか?

は すべてのベスト ありがとう - ティム

答えて

3

あなたのアプローチは、私にはいいですね。 IQ機能が必要だとは思わない。カスタムTransformerを定義し、それにキー値ストアを添付してください。処理中にfailed=trueというメッセージが表示された場合は、そのIDを店舗に入れます。 failed=falseの着信メッセージごとに、前回失敗した同じIDのメッセージがあるかどうかを確認するためにストアをさらにチェックします。失敗したメッセージを永続化する

、あなたはもしかしたらbranch()を使用し、特別なトピックへfailedメッセージを書き込む(二つにあなたのストリームを分割します。

+1

このアプローチは本当にきれいで固体に見えますが、非常に多くのマティアス、ありがとうございました。私は実装しています今すぐ。 –

関連する問題