2017-02-23 7 views
3

1つのマスタートピックから複数のストリームを作成するにはどうすればよいですか?私はこのような何か行うと:単一のマスタートピックの複数のストリーム

KStreamBuilder builder = new KStreamBuilder(); 

builder.stream(Serdes.String(), Serdes.String(), "master") 
      /* Filtering logic */ 
      .to(Serdes.String(), Serdes.String(), "output1"); 

builder.stream(Serdes.String(), Serdes.String(), "master") 
      /* Filtering logic */ 
      .to(Serdes.String(), Serdes.String(), "output2"); 

KafkaStreams streams = new KafkaStreams(builder, /* config */); 
streams.start(); 

を私は次のエラーを取得:

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source. 
    at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347) 
    at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92) 

を私は「マスター」から、各ストリームのためKafkaStreamsの別のインスタンスを作成する必要がありますか?

答えて

7

あなたが再利用できることKStreamを作成することができます。

KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master"); 

を、あなたはそれを再利用することができます

inputStream.filter(..logic1) 
     .to(Serdes.String(), Serdes.String(), "output1"); 
inputStream.filter(..logic2) 
     .to(Serdes.String(), Serdes.String(), "output2"); 

KafkaStreams streams = new KafkaStreams(builder, /* config */); 
streams.start(); 
+2

あなたのフィルタで重複を持っていない場合、あなたはまた '使用することができます入力ストリーム.branch() 'はオーバーラップしないサブストリームを返します。 –

関連する問題