2016-05-05 5 views
3

入力チャネルからのメッセージをフィルタリングし、出力上でそれらを他のものに変換するモジュールを作成したいとします。プロセッサモジュール内のSpring XDフィルタリング

@Filter(inputChannel = "input", outputChannel = "output") 
public boolean accept(final Message<?> message) { 
    final MyObject payload = (MyObject) message.getPayload(); 
    return payload.getName().equals("test"); 
} 


@Transformer(inputChannel = "input", outputChannel = "output") 
public OtherObject transform(final MyObject data) { 
    return convert(data); 
} 

が、私は単一のモジュールでこれを行うしたいと思います:私は、私のような二つのモジュール(私はスクリプトを超えるJavaコードを好む)でこれを区切ることができます知っています。フィルタリングロジックをtransfomerモジュールに移して、許容できないペイロードにnull値を返すと、私はspring-xdランタイム例外を取得し始めます。これに対する正しいアプローチは何でしょうか?

--EDIT--

構成:

@Configuration 
@EnableIntegration 
public class ModuleConfiguration { 

@Bean 
public MessageChannel input() { 
    return new DirectChannel(); 
} 

@Bean 
public MessageChannel output() { 
    return new DirectChannel(); 
} 

@Bean 
public MessageChannel myChannel() { 
    return new DirectChannel(); 
} 

@Bean 
public MyFilter filter() { 
    return new MyFilter(); 
} 

@Bean 
public MyTransformer transformer() { 
    return new MyTransformer(); 
} 
} 

フィルタ:

@Filter(inputChannel = "input", outputChannel = "myChannel") 
public boolean accept(final Message<?> message) 

トランスフォーマー:

@Transformer(inputChannel = "myChannel", outputChannel = "output") 
public OtherObject transform(final MyObject payload) 

例外:

2016-05-13T11:17:59+0200 1.3.1.RELEASE WARN xdbus.tt.0-1 listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed. 
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) ~[spring-rabbit-1.5.4.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.5.4.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) ~[spring-rabbit-1.5.4.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:183) ~[spring-rabbit-1.5.4.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1358) [spring-rabbit-1.5.4.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661) ~[spring-rabbit-1.5.4.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1102) [spring-rabbit-1.5.4.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1086) [spring-rabbit-1.5.4.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) [spring-rabbit-1.5.4.RELEASE.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73] 
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'JavaConfiguredModule [name=myFilter, type=processor, group=tt, index=1 @7d48b140]:default,admin,singlenode,hsqldbServer:9393.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE] 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE] 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE] 
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:45) ~[spring-integration-amqp-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:93) ~[spring-integration-amqp-4.2.5.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:757) ~[spring-rabbit-1.5.4.RELEASE.jar:na] 
    ... 10 common frames omitted 
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.5.RELEASE.jar:na] 
    ... 33 common frames omitted 

答えて

2

あなたが見つけた通り、変圧器を返す必要があります。

Spring Integration documentationを参照してください(各XDプロセッサモジュールは、入力と出力チャンネルを持つ小さなSpring Integrationアプリケーションです。ソースには出力があり、シンクには入力があります)。

あなたが今持っている方法では、inputに2人の消費者がいます。メッセージはラウンドロビン配信されます。あなたは、(第3のメッセージチャネルを介して)メッセージフローに二つの成分を配線する必要が

...

@Filter(inputChannel = "input", outputChannel = "transformerChannel") 
public boolean accept(final Message<?> message) { 
    final MyObject payload = (MyObject) message.getPayload(); 
    return payload.getName().equals("test"); 
} 


@Transformer(inputChannel = "transformerChannel", outputChannel = "output") 
public OtherObject transform(final MyObject data) { 
    return convert(data); 
} 

は、チャネル構成に注目してください。

+0

こんにちは、ご協力いただきありがとうございます。 これらのモジュールは別々の設定で展開されていますが、メッセージは正しく配布されていると思いますが、確認する必要があります。 しかし、私の質問のポイントは、単一のモジュールでペイロードをフィルタリングして変換することが可能であるということでしたか?メッセージバスに何かを置く必要はありませんか? ありがとう... – aturkovic

+0

上記のシナリオではどちらのアプローチも効果がありますが、それらは1つのモジュールで 'transformerChannel'を介して直接配線されています。それらがすでに別々のモジュールとして構成されてアップロードされている場合、それらを1つのモジュールに構成することができ、バスを避けて配線されます。 [the docs](http://docs.spring.io/spring-xd/docs/1.3.1.RELEASE/reference/html/#composing-modules)を参照してください。モジュールcompose foo --definition "filter | transfomer"ストリーム定義の 'foo'をモジュールとして使うことができます。 –

+0

"バスを避けて一緒に配線されます" - これは直接通信することを意味しますか?またはいくつかのJavaキュー経由で?もう一度ありがとう... – aturkovic

関連する問題