私はPublishSubscribeChannelがどのように機能するかを理解したいので、私は小さな例実装しました:春の統合DSL:PublishSubscribeChannel順
@Bean
public MessageSource<?> integerMessageSource() {
MethodInvokingMessageSource source = new MethodInvokingMessageSource();
source.setObject(new AtomicInteger());
source.setMethodName("getAndIncrement");
return source;
}
@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 1: {}", message.getPayload())))
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 2: {}", message.getPayload())))
.subscribe(flow -> flow
.transform(source -> MessageBuilder.withPayload("Error").build())
.handle(message -> {
LOG.info("Error");
}))
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 4: {}", message.getPayload())))
)
.get();
// @formatter:on
}
を私は出力として見ることを期待しました:
Handling message, step 1...
Handling message, step 2...
Error
Handling message, step 4...
しかし、3番目のサブフロー( "エラー"出力付き)は常に最初に処理されます。私は、ステップ1、2、および4ために私を定義しようとすると、私は(警告を)次のコンソール出力を得る:
o.s.integration.dsl.GenericEndpointSpec : 'order' can be applied only for AbstractMessageHandler
私は、加入者が呼び出されることを期待したサブスクリプションのために、しかし、これはそうではないようです。
私はSpring Boot 1.5.4とSpring Integration 4.3.10を使用しています。
[OK]を、おかげで、これは解決します問題!私は、.handle()メソッドで "order"を使うと思っていました。入力としてラムダを使うこともできる.transform()メソッドが可能です。 .handle()と.transform()の違いは何ですか? – matthjes
'.handle()'は 'MessageHandler'実装(ラムダ)を返します。 '.transform(.. any ...)'( '.transform()')でも 'Orderable'を実装する' MessageTransformingHandler'を返します。 '.transform'の場合、ラムダは' Transformer'であり、 'MessageHandler'ではありません。 –
もう少しコードを必要としない別の作業を追加しました。 EDITを参照してください。 私は[INT-4347](https://jira.spring.io/browse/INT-4347)を開いてこれを改善できるかどうかを確認しました。 –