入力チャネルからのメッセージをフィルタリングし、出力上でそれらを他のものに変換するモジュールを作成したいとします。プロセッサモジュール内のSpring XDフィルタリング
@Filter(inputChannel = "input", outputChannel = "output")
public boolean accept(final Message<?> message) {
final MyObject payload = (MyObject) message.getPayload();
return payload.getName().equals("test");
}
@Transformer(inputChannel = "input", outputChannel = "output")
public OtherObject transform(final MyObject data) {
return convert(data);
}
が、私は単一のモジュールでこれを行うしたいと思います:私は、私のような二つのモジュール(私はスクリプトを超えるJavaコードを好む)でこれを区切ることができます知っています。フィルタリングロジックをtransfomerモジュールに移して、許容できないペイロードにnull値を返すと、私はspring-xdランタイム例外を取得し始めます。これに対する正しいアプローチは何でしょうか?
--EDIT--
構成:
@Configuration
@EnableIntegration
public class ModuleConfiguration {
@Bean
public MessageChannel input() {
return new DirectChannel();
}
@Bean
public MessageChannel output() {
return new DirectChannel();
}
@Bean
public MessageChannel myChannel() {
return new DirectChannel();
}
@Bean
public MyFilter filter() {
return new MyFilter();
}
@Bean
public MyTransformer transformer() {
return new MyTransformer();
}
}
フィルタ:
@Filter(inputChannel = "input", outputChannel = "myChannel")
public boolean accept(final Message<?> message)
トランスフォーマー:
@Transformer(inputChannel = "myChannel", outputChannel = "output")
public OtherObject transform(final MyObject payload)
例外:
2016-05-13T11:17:59+0200 1.3.1.RELEASE WARN xdbus.tt.0-1 listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:183) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1358) [spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1102) [spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1086) [spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) [spring-rabbit-1.5.4.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'JavaConfiguredModule [name=myFilter, type=processor, group=tt, index=1 @7d48b140]:default,admin,singlenode,hsqldbServer:9393.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:45) ~[spring-integration-amqp-4.2.5.RELEASE.jar:na]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:93) ~[spring-integration-amqp-4.2.5.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:757) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
... 10 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
... 33 common frames omitted
こんにちは、ご協力いただきありがとうございます。 これらのモジュールは別々の設定で展開されていますが、メッセージは正しく配布されていると思いますが、確認する必要があります。 しかし、私の質問のポイントは、単一のモジュールでペイロードをフィルタリングして変換することが可能であるということでしたか?メッセージバスに何かを置く必要はありませんか? ありがとう... – aturkovic
上記のシナリオではどちらのアプローチも効果がありますが、それらは1つのモジュールで 'transformerChannel'を介して直接配線されています。それらがすでに別々のモジュールとして構成されてアップロードされている場合、それらを1つのモジュールに構成することができ、バスを避けて配線されます。 [the docs](http://docs.spring.io/spring-xd/docs/1.3.1.RELEASE/reference/html/#composing-modules)を参照してください。モジュールcompose foo --definition "filter | transfomer"ストリーム定義の 'foo'をモジュールとして使うことができます。 –
"バスを避けて一緒に配線されます" - これは直接通信することを意味しますか?またはいくつかのJavaキュー経由で?もう一度ありがとう... – aturkovic