2つのカフカトピックが異なるソースから正確に同じコンテンツをストリーミングするので、ソースのいずれかが失敗した場合に高い可用性を得ることができます。 Kafka Streams 0.10.1.0を使用して、2つのトピックを1つの出力トピックにマージしようとしています。そのため、すべてのソースが上がっても重大なエラーメッセージは表示されません。複数の同一のカフカストリームのトピックをマージする
KStreamのleftJoin
メソッドを使用すると、トピックの1つが問題なく表示されます(セカンダリトピック)が、プライマリトピックがダウンすると、出力トピックに何も送信されません。これは
KStream-KStream leftJoinは常にプライマリストリームから到着したレコードによって駆動され、Kafka Streams developer guideによると、理由のようです
ので
主流からレコードがない場合は、それをセカンダリストリームのレコードは存在しても使用されません。一次ストリームがオンラインに戻ると、出力は正常に再開します。
私も重複を取り除くためにKTableとgroupByKeyへの変換が続く(重複レコードが追加されている)outerJoin
を使用して試してみた
、
KStream mergedStream = stream1.outerJoin(stream2,
(streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
JoinWindows.of(2000L))
mergedStream.groupByKey()
.reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
.toStream((key,value) -> value)
.to(outputStream)
が、私はまだたまに重複を取得します。私はまたcommit.interval.ms=200
を使用して、KTableが出力ストリームに十分頻繁に送信されるようにします。
複数の同一の入力トピックから正確に一度の出力を得るために、このマージに最も近い方法は何でしょうか?
一般的に、問題を解決するためにProcessor APIをお勧めします。現在の 'trunk'バージョンに切り替えることもできます(これはあなたにとって可能ですか)。結合が修正され、問題が解決する可能性があります:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics新しい結合の意味はKafka '0.10.2'に含まれます目標リリース日は2017年1月(https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan)です。 –
@ MatthiasJ.Sax私はトランクに切り替えました。そして、 'leftJoin'がKStream-KStreamジョインのための' outerJoin'のように振る舞うようになったので、私は10.1セマンティクスに戻ります。私が今試みているのは、元のものをleftJoinでプライマリとして使用し、leftJoinでセカンダリを使ってnullを出力する偽のストリームを作成することです。私はプライマリがダウンしていても(これは最初のleftJoinからnullを取得するため)、これがプライマリストリームに常に値を持つことを望みます。 –
新しい 'leftJoin'は、古い' outerJoin'も同様に両側からトリガーします(これは、leftJoinが現在outerJoinのように動作しているように見えるのでしょうか?) - これは古い 'leftJoin'よりもSQLセマンティクスに近いですが、' leftJoin'は 'outerJoin'とはまだ異なります:右手側のトリガーがあり、結合相手が見つからなければ、レコードを落とし、結果は出ません。 –