spring-cloud-starter-stream-kafkaを使用して、春の雲ストリームを使用しています。私は次のようにapplication.propertiesにトピックをカフカために私のチャンネルを結合した:Spring Cloud Stream @ServiceActivatorが例外時にerrorChannelにメッセージを送信しない
spring.cloud.stream.bindings.gatewayOutput.destination=received
spring.cloud.stream.bindings.enrichingInput.destination=received
spring.cloud.stream.bindings.enrichingOutput.destination=enriched
spring.cloud.stream.bindings.redeemingInput.destination=enriched
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed
spring.cloud.stream.bindings.error.destination=errors12
spring.cloud.stream.bindings.errorInput.destination=errors12
spring.cloud.stream.bindings.errorOutput.destination=errors12
私はエラーチャネルに例外メッセージを生成するために、私のプログラムを取得することができません。意外にも、私は別のスレッド(私は@Outputにメッセージをダンプする@MessagingGatewayを持っていても、それを生成しようとするように見えないし、残りの流れは非同期に発生します)。ここに私のServiceActivatorの定義があります:
@Named
@Configuration
@EnableBinding(Channels.class)
@EnableIntegration
public class FulfillingServiceImpl extends AbstractBaseService implements
FulfillingService {
@Override
@Audit(value = "annotatedEvent")
@ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false")
public void fulfill(TrivialRedemption redemption) throws Exception {
logger.error("FULFILLED!!!!!!");
throw new Exception("test exception");
}
}
ここではログが生成されています(私は完全な例外を切り捨てました)。無... errorChannelについて
- 苦情が任意の加入者
- カフカ生産者スレッドのログを持っていない
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud[email protected]2b461688 received message: GenericMessage [payload=byte[400], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18}] - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.spr[email protected]64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) received message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationEvaluationContext' - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationConversionService' - {} 2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - FULFILLED!!!!!! - {} 2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {} ... ... 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='enriched', id=0]@18 - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='redeemed', id=0]@18 - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='errors12', id=0]@0 - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
はEDITあり:ここでは私のチャンネルクラスの内容です:
パブリックインターフェイスチャネル{
public static final String GATEWAY_OUTPUT = "gatewayOutput";
public static final String ENRICHING_INPUT = "enrichingInput";
public static final String ENRICHING_OUTPUT = "enrichingOutput";
public static final String REDEEMING_INPUT = "redeemingInput";
public static final String REDEEMING_OUTPUT = "redeemingOutput";
public static final String FULFILLING_INPUT = "fulfillingInput";
public static final String FULFILLING_OUTPUT = "fulfillingOutput";
@Output(GATEWAY_OUTPUT)
MessageChannel gatewayOutput();
@Input(ENRICHING_INPUT)
MessageChannel enrichingInput();
@Output(ENRICHING_OUTPUT)
MessageChannel enrichingOutput();
@Input(REDEEMING_INPUT)
MessageChannel redeemingInput();
@Output(REDEEMING_OUTPUT)
MessageChannel redeemingOutput();
@Input(FULFILLING_INPUT)
MessageChannel fulfillingInput();
@Output(FULFILLING_OUTPUT)
MessageChannel fulfillingOutput();
また、Spring Cloud Streamの将来のソリューションについては、https://github.com/spring-cloud/spring-cloud-stream/issues/538の進捗状況に従ってください。 –
総合的な答えをありがとう、私はまだそれを消化しています。私はChannels.classを追加しました.Adviceベースのアプローチを採用することの提案と同様に、ステートフルなリトライ動作にも興味があります。私はあなたの答えのいくつかの点について少し混乱しています。あなたは[バインダーは再試行で構成でき、例外をデッドレターのトピックにルーティングすることができます。 1.0.0.RELEASEにあるこのPRを参照してください。]私は1.0.0.RC2を使用しています。これはPRよりも前のようです。 1.0.0.RELEASEはいつリリースされるのですか? –
[先週リリースされました](https://spring.io/blog/2016/05/10/spring-cloud-stream-1-0-0-release-is-available) –