2016-11-24 2 views
7

2つのカフカトピックが異なるソースから正確に同じコンテンツをストリーミングするので、ソースのいずれかが失敗した場合に高い可用性を得ることができます。 Kafka Streams 0.10.1.0を使用して、2つのトピックを1つの出力トピックにマージしようとしています。そのため、すべてのソースが上がっても重大なエラーメッセージは表示されません。複数の同一のカフカストリームのトピックをマージする

KStreamのleftJoinメソッドを使用すると、トピックの1つが問題なく表示されます(セカンダリトピック)が、プライマリトピックがダウンすると、出力トピックに何も送信されません。これは

KStream-KStream leftJoinは常にプライマリストリームから到着したレコードによって駆動され、Kafka Streams developer guideによると、理由のようです

ので

主流からレコードがない場合は、それをセカンダリストリームのレコードは存在しても使用されません。一次ストリームがオンラインに戻ると、出力は正常に再開します。

私も重複を取り除くためにKTableとgroupByKeyへの変換が続く(重複レコードが追加されている) outerJoinを使用して試してみた

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1, 
    JoinWindows.of(2000L)) 

mergedStream.groupByKey() 
      .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore)) 
      .toStream((key,value) -> value) 
      .to(outputStream) 

が、私はまだたまに重複を取得します。私はまたcommit.interval.ms=200を使用して、KTableが出力ストリームに十分頻繁に送信されるようにします。

複数の同一の入力トピックから正確に一度の出力を得るために、このマージに最も近い方法は何でしょうか?

+0

一般的に、問題を解決するためにProcessor APIをお勧めします。現在の 'trunk'バージョンに切り替えることもできます(これはあなたにとって可能ですか)。結合が修正され、問題が解決する可能性があります:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics新しい結合の意味はKafka '0.10.2'に含まれます目標リリース日は2017年1月(https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan)です。 –

+0

@ MatthiasJ.Sax私はトランクに切り替えました。そして、 'leftJoin'がKStream-KStreamジョインのための' outerJoin'のように振る舞うようになったので、私は10.1セマンティクスに戻ります。私が今試みているのは、元のものをleftJoinでプライマリとして使用し、leftJoinでセカンダリを使ってnullを出力する偽のストリームを作成することです。私はプライマリがダウンしていても(これは最初のleftJoinからnullを取得するため)、これがプライマリストリームに常に値を持つことを望みます。 –

+0

新しい 'leftJoin'は、古い' outerJoin'も同様に両側からトリガーします(これは、leftJoinが現在outerJoinのように動作しているように見えるのでしょうか?) - これは古い 'leftJoin'よりもSQLセマンティクスに近いですが、' leftJoin'は 'outerJoin'とはまだ異なります:右手側のトリガーがあり、結合相手が見つからなければ、レコードを落とし、結果は出ません。 –

答えて

5

いずれの種類の結合を使用しても問題は解決しません。一部のストリームがストールした場合は内部結合、またはnull(左結合または外部結合両方のストリームがオンラインの場合)。 Kafkaストリームの結合セマンティクスの詳細については、https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semanticsを参照してください。

したがって、私はあなたがprocess()transform()、またはtransformValues()KStreamを使用してDSLと-とマッチを混在させることができ、プロセッサのAPIを使用することをお勧めします。詳細は、How to filter keys and value with a Processor using Kafka Stream DSLを参照してください。

また、プロセッサにカスタムストアを追加して(How to add a custom StateStore to the Kafka Streams DSL processor?)、重複フィルタ処理のフォールトトレラントを作成することもできます。

関連する問題