2017-08-16 3 views
3

RabbitMQでSpringクラウドストリームを使用する理由を理解しようとしています。私はRabbitMQ Springチュートリアル4(https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html)を見てきました。これは基本的に私がやりたいことです。それは、2つのキューが接続された直接交換を作成し、ルーティングキーに応じて、メッセージがQ1またはQ2にルーティングされる。Spring Cloud Stream RabbitMQ

チュートリアルを見て、すべてのパーツを作成し、一緒にバインドして行く準備ができたら、全体のプロセスはかなり簡単です。

私はSing Cloud Streamを使用する際にどのようなメリットがあるのだろうと思っていました。単純な交換を作成したり、宛先を定義することも簡単でしたし、グループはストリーミングで簡単でした。だから私はさらに進んでストリームでチュートリアルのケースを処理しようとしない理由を考えました。

私はストリームがBinderAwareChannelResolverと同じことをしているのを見ました。しかし私はRabbitMQ Springチュートリアルと同じようにすべてをまとめるのに苦労しています。トリックに

spring.cloud.stream.bindings.output.destination=myDestination 
spring.cloud.stream.bindings.output.group=consumerGroup 
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key' 

必要があります:それは依存関係の問題である場合、私はわからないが、私は基本的にここで何かを誤解しているようだ、私は次のように思いました。

基本的にダイレクト交換を作成し、2つのキューをバインドし、のような2つのキューのいずれかへのルーティングキールートに応じて、ソースとシンクの最小の例を持つ人はいますか?

EDIT:以下

私が尋ね行う方法を示すコードの最小セットです。それは単純です(しかし、誰もが興味を持っている場合、私に知らせて)として、私は

application.propertiesbuild.gradleを添付していない:セットアッププロデューサー

spring.cloud.stream.bindings.output.destination=tut.direct 
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct 
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type 

Sources.class:セットアッププロデューサーが

public interface Sources { 

    String OUTPUT = "output"; 

    @Output(Sources.OUTPUT) 
    MessageChannel output(); 
} 
チャンネル

StatusController.class:残りの通話に応答し、特定のルーティングキーを使用してメッセージを送信する

/** 
* Status endpoint for the health-check service. 
*/ 
@RestController 
@EnableBinding(Sources.class) 
public class StatusController { 

    private int index; 

    private int count; 

    private final String[] keys = {"orange", "black", "green"}; 

    private Sources sources; 

    private StatusService status; 

    @Autowired 
    public StatusController(Sources sources, StatusService status) { 
     this.sources = sources; 
     this.status = status; 
    } 

    /** 
    * Service available, service returns "OK"'. 
    * @return The Status of the service. 
    */ 
    @RequestMapping("/status") 
    public String status() { 
     String status = this.status.getStatus(); 

     StringBuilder builder = new StringBuilder("Hello to "); 
     if (++this.index == 3) { 
      this.index = 0; 
     } 
     String key = keys[this.index]; 
     builder.append(key).append(' '); 
     builder.append(Integer.toString(++this.count)); 
     String payload = builder.toString(); 
     log.info(payload); 

     // add kv pair - routingkeyexpression (which matches 'type') will then evaluate 
     // and add the value as routing key 
     Message<String> msg = new GenericMessage<>(payload, Collections.singletonMap("type", key)); 
     sources.output().send(msg); 

     // return rest call 
     return status; 
    } 
} 
物事の

消費者側、プロパティ:

spring.cloud.stream.bindings.input.destination=tut.direct 
spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType=direct 
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=orange 
spring.cloud.stream.bindings.inputer.destination=tut.direct 
spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType=direct 
spring.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey=black 

Sinks.class

public interface Sinks { 

    String INPUT = "input"; 

    @Input(Sinks.INPUT) 
    SubscribableChannel input(); 

    String INPUTER = "inputer"; 

    @Input(Sinks.INPUTER) 
    SubscribableChannel inputer(); 
} 

ReceiveStatus.class:ステータスを受信:

@EnableBinding(Sinks.class) 
public class ReceiveStatus { 
    @StreamListener(Sinks.INPUT) 
    public void receiveStatusOrange(String msg) { 
     log.info("I received a message. It was orange number: {}", msg); 
    } 

    @StreamListener(Sinks.INPUTER) 
    public void receiveStatusBlack(String msg) { 
     log.info("I received a message. It was black number: {}", msg); 
    } 
} 

答えて

3

春クラウドストリームは、イベント駆動型マイクロサービスアプリケーションを開発することができますアプリケーションが(@EnableBindingを介して)eに接続できるようにするSpring Cloud Stream Binder実装(Kafka、RabbitMQ、JMSバインダーなど)を使用する外部メッセージングシステムどうやら、Spring Cloud StreamはRabbitMQバインダーの実装にSpring AMQPを使用しています。

BinderAwareChannelResolverは、プロデューサの動的バインディングサポートに適用されます。あなたのケースでは、エクスチェンジの設定と、そのエクスチェンジへのコンシューマのバインディングに関するものだと思います。

たとえば、条件に基づいて適切なbindingRoutingKeyが設定された2人の消費者と、上記の(グループを除く)プロパティ(ルーティングキー式、宛先)を持つ1人のプロデューサーが必要です。アウトバウンドチャネルにgroupを設定したことに気付きました。 groupプロパティは、コンシューマ(したがってインバウンド)にのみ適用されます。

また、https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/57と私はrouting-key-expressionを使用していくつかの議論を見ることができます。具体的には、式の値を使用してthisをチェックします。

+0

答えに感謝します。私はすでに言及した問題を見ていると、彼らは私が実際にこのstackoverflowの質問をした理由です。自分がするべきことは他の人には明らかだが、私にとってはそうではない。 'BinderAwareChannelResolver'に関するあなたの説明は、私が正しいコーナーに到達したことを理解しています:)。しかし、私はrouting-key-expressionを設定しようとしましたが、うまくいきませんでした。 gradleの依存関係の問題のようですが、私はそれをうまく動作させることはできませんでした。だから私はなぜこのプロジェクトの例を聞いたのですか? – maiksensi

+0

私はコンシューマー側に以下のように働いているようです: 'spring.cloud.stream.bindings.input.destination = tut.direct spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType = direct spring.cloud .stream.rabbit.bindings.input.consumer.bindingRoutingKey = orange spring.cloud.stream.bindings.inputer.destination = tut.direct spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType = direct spring .cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey = black' プロデューサ側でルーティングキーを動的に設定するだけです。 – maiksensi

関連する問題