2017-09-22 6 views
1

私はPublishSubscribeChannelがどのように機能するかを理解したいので、私は小さな例実装しました:春の統合DSL:PublishSubscribeChannel順

@Bean 
public MessageSource<?> integerMessageSource() { 
    MethodInvokingMessageSource source = new MethodInvokingMessageSource(); 
    source.setObject(new AtomicInteger()); 
    source.setMethodName("getAndIncrement"); 
    return source; 
} 



@Bean 
public IntegrationFlow mainFlow() { 
    // @formatter:off 
    return IntegrationFlows 
     .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000))) 
     .publishSubscribeChannel(pubSub -> pubSub 
      .subscribe(flow -> flow 
       .handle(message -> LOG.info("Handling message, step 1: {}", message.getPayload()))) 
      .subscribe(flow -> flow 
       .handle(message -> LOG.info("Handling message, step 2: {}", message.getPayload()))) 
      .subscribe(flow -> flow 
       .transform(source -> MessageBuilder.withPayload("Error").build()) 
       .handle(message -> { 
        LOG.info("Error"); 
       })) 
      .subscribe(flow -> flow 
       .handle(message -> LOG.info("Handling message, step 4: {}", message.getPayload()))) 
     ) 
     .get(); 
    // @formatter:on 
} 

を私は出力として見ることを期待しました:

Handling message, step 1... 
Handling message, step 2... 
Error 
Handling message, step 4... 

しかし、3番目のサブフロー( "エラー"出力付き)は常に最初に処理されます。私は、ステップ1、2、および4ために私を定義しようとすると、私は(警告を)次のコンソール出力を得る:

o.s.integration.dsl.GenericEndpointSpec : 'order' can be applied only for AbstractMessageHandler 

私は、加入者が呼び出されることを期待したサブスクリプションのために、しかし、これはそうではないようです。

私はSpring Boot 1.5.4とSpring Integration 4.3.10を使用しています。

答えて

1

問題がラムダハンドラがOrderedないことである - パブ/サブチャネルのための一般的な契約は(順番に)最初Ordered加入者を呼び出すことであり、次いで、順不同加入。

lambdasは複数のインターフェイスを実装できないため、私たちができることは何もわかりません。回避策として

すべての加入者がOrderedされるように、あなたが何かのような...

@Bean 
public IntegrationFlow mainFlow() { 
    // @formatter:off 
    return IntegrationFlows 
     .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000))) 
     .publishSubscribeChannel(pubSub -> pubSub 
      .subscribe(flow -> flow 
       .handle(handler("Handling message, step 1: {}"))) 
      .subscribe(flow -> flow 
       .handle(handler("Handling message, step 2: {}"))) 
      .subscribe(flow -> flow 
       .transform(message -> "Error") 
       .handle(message -> { 
        LOG.info("Error"); 
       })) 
      .subscribe(flow -> flow 
       .handle(handler("Handling message, step 4: {}"))) 
     ) 
     .get(); 
    // @formatter:on 
} 

private MessageHandler handler(String format) { 
    return new AbstractMessageHandler() { 

     @Override 
     protected void handleMessageInternal(Message<?> message) throws Exception { 
      LOG.info(format, message.getPayload()); 
     } 

    }; 

} 

を行うことができます。

EDIT

ここで少し簡単に回避策だ - そう、すべてのサブフロー最初のコンポーネントがOrderedを実装する代わりに、ラムダのブリッジとサブフローを起動するには...

@Bean 
public IntegrationFlow mainFlow() { 
    // @formatter:off 
    return IntegrationFlows 
     .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000))) 
     .publishSubscribeChannel(pubSub -> pubSub 
      .subscribe(flow -> flow 
       .bridge(e -> e.id("s1")) 
       .handle(message -> LOG.info("Handling message, step 1: {}", message.getPayload()))) 
      .subscribe(flow -> flow 
       .bridge(e -> e.id("s2")) 
       .handle(message -> LOG.info("Handling message, step 2: {}", message.getPayload()))) 
      .subscribe(flow -> flow 
       .transform(source -> MessageBuilder.withPayload("Error").build()) 
       .handle(message -> { 
        LOG.info("Error"); 
       })) 
      .subscribe(flow -> flow 
       .bridge(e -> e.id("s4")) 
       .handle(message -> LOG.info("Handling message, step 4: {}", message.getPayload()))) 
     ) 
     .get(); 
    // @formatter:on 
} 
+0

[OK]を、おかげで、これは解決します問題!私は、.handle()メソッドで "order"を使うと思っていました。入力としてラムダを使うこともできる.transform()メソッドが可能です。 .handle()と.transform()の違いは何ですか? – matthjes

+0

'.handle()'は 'MessageHandler'実装(ラムダ)を返します。 '.transform(.. any ...)'( '.transform()')でも 'Orderable'を実装する' MessageTransformingHandler'を返します。 '.transform'の場合、ラムダは' Transformer'であり、 'MessageHandler'ではありません。 –

+0

もう少しコードを必要としない別の作業を追加しました。 EDITを参照してください。 私は[INT-4347](https://jira.spring.io/browse/INT-4347)を開いてこれを改善できるかどうかを確認しました。 –