2016-09-13 6 views
0

私は、異なるトピックのメッセージを送信し、それから聞きたいトピックで消費者を設定しようとしています。春の雲のストリームrabbitmq消費者の話題を指定します。

私の考えは、単一の宛先 "domainMessage"を使用し、カスタムパーティション戦略を使用することです。私は列挙型を持っており、私はpartitionKeyとしてその値を使用しているだけで、partitionStrategyはキーを返すだけです(キーは常にプロデューサ側のパーティション数と等しいと仮定します)。

これは機能しますか?もしそうなら、私は消費者をどのように設定するか分からない。

私のプロデューサーは、以下の持っている

@Override 
    public Object extractKey(Message<?> message) { 
     DomainMessage payload = (DomainMessage) message.getPayload(); 
     return payload.getType(); 
    } 

マイPartitionStrategyImplが

@Override 
    public int selectPartition(Object key, int partitionCount) { 
     return (int)key; 
    } 

マイ消費者application.propertiesのように見えます

のように見えるように

spring.cloud.stream.bindings.output.destination=domainMessages 
spring.cloud.stream.bindings.output.producer.partition-key-extractor-class=publisher.partitionstrategy.PartitionKeyExtractorImpl 
spring.cloud.stream.bindings.output.producer.partition-selector-class=publisher.partitionstrategy.PartitionSelectorStrategyImpl 
spring.cloud.stream.bindings.output.producer.partition-count=3 

マイPartitionKeyExtractorImplが見えますapplication.properties

spring.cloud.stream.bindings.input.destination=domainMessage 
spring.cloud.stream.bindings.input.group=group01 
spring.cloud.stream.bindings.input.consumer.partitioned=true 
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true 

たとえば、payload.getType()は1〜3の値を返すことができます。 partitionKeyを1と3にしたメッセージだけを聞くようにコンシューマーを設定するにはどうすればよいですか?

答えて

0

春のクラウドストリームは、キューや交換の宣言に関して非常に賛成です。

spring.cloud.stream.bindings.input.destination=domainMessage 
spring.cloud.stream.bindings.input.group=group01 
spring.cloud.stream.bindings.input.consumer.partitioned=true 
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true 
spring.cloud.stream.bindings.input.consumer.instance-index=0 

(インスタンスインデックスに気付く)

これはキーdomainMessage-0ルーティングとdomainMessageを交換するdomainMessage.group01-0に結合するであろう。

あなたは、単一のインスタンスで複数のパーティションを消費したい場合には、対応する交換機(例えば、各キューを結合するので、単に

spring.cloud.stream.bindings.input.destination=domainMessage-0,domainMessage-1 

(およびinstance-indexpartitioned=trueを削除)を実行すると、動作しませんdomainMessage-0ルーティングキー#)。

適切なルーティングキーを使用してアップストリーム交換機(domainMessage)に各コンシューマ交換機(domainMessage-n)をバインドするために交換機間バインディングを手動で追加します。

現在、1つのinstance-indexで複数のパーティションから自動的に消費する方法はありません。

+0

この機能は今後サポートされる予定ですか?ちょうど好奇心が強い – user3344591

+0

それは前に出てきたものではありません。通常は、パーティション分割とキー選択を設定してメッセージを1つのパーティションに移動します。つまり、ケース0の場合は0、1をパーティション0に送信します。[GitHub Issue](https://github.com/spring -cloud/spring-cloud-stream/issues)を提示してください。 –

+0

開封 https://github.com/spring-cloud/spring-cloud-stream/issues/648 – user3344591

関連する問題