2016-05-13 5 views
3

spring-cloud-starter-stream-kafkaを使用して、春の雲ストリームを使用しています。私は次のようにapplication.propertiesにトピックをカフカために私のチャンネルを結合した:Spring Cloud Stream @ServiceActivatorが例外時にerrorChannelにメッセージを送信しない

spring.cloud.stream.bindings.gatewayOutput.destination=received 
spring.cloud.stream.bindings.enrichingInput.destination=received 
spring.cloud.stream.bindings.enrichingOutput.destination=enriched 
spring.cloud.stream.bindings.redeemingInput.destination=enriched 
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed 
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed 
spring.cloud.stream.bindings.error.destination=errors12 
spring.cloud.stream.bindings.errorInput.destination=errors12 
spring.cloud.stream.bindings.errorOutput.destination=errors12 

私はエラーチャネルに例外メッセージを生成するために、私のプログラムを取得することができません。意外にも、私は別のスレッド(私は@Outputにメッセージをダンプする@MessagingGatewayを持っていても、それを生成しようとするように見えないし、残りの流れは非同期に発生します)。ここに私のServiceActivatorの定義があります:

@Named 
@Configuration 
@EnableBinding(Channels.class) 
@EnableIntegration 
public class FulfillingServiceImpl extends AbstractBaseService implements 
     FulfillingService { 

    @Override 
    @Audit(value = "annotatedEvent") 
    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false") 
    public void fulfill(TrivialRedemption redemption) throws Exception { 

     logger.error("FULFILLED!!!!!!"); 

     throw new Exception("test exception"); 

    } 
} 

ここではログが生成されています(私は完全な例外を切り捨てました)。無... errorChannelについて

  • 苦情が任意の加入者
  • カフカ生産者スレッドのログを持っていない
 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud[email protected]2b461688 received message: GenericMessage [payload=byte[400], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.spr[email protected]64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) received message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationEvaluationContext' - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationConversionService' - {} 
2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - FULFILLED!!!!!! - {} 
2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {} 
... 
... 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='enriched', id=0]@18 - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='redeemed', id=0]@18 - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='errors12', id=0]@0 - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 

はEDITあり:ここでは私のチャンネルクラスの内容です:

パブリックインターフェイスチャネル{

public static final String GATEWAY_OUTPUT = "gatewayOutput"; 

public static final String ENRICHING_INPUT = "enrichingInput"; 
public static final String ENRICHING_OUTPUT = "enrichingOutput"; 

public static final String REDEEMING_INPUT = "redeemingInput"; 
public static final String REDEEMING_OUTPUT = "redeemingOutput"; 

public static final String FULFILLING_INPUT = "fulfillingInput"; 
public static final String FULFILLING_OUTPUT = "fulfillingOutput"; 

@Output(GATEWAY_OUTPUT) 
MessageChannel gatewayOutput(); 

@Input(ENRICHING_INPUT) 
MessageChannel enrichingInput(); 

@Output(ENRICHING_OUTPUT) 
MessageChannel enrichingOutput(); 

@Input(REDEEMING_INPUT) 
MessageChannel redeemingInput(); 

@Output(REDEEMING_OUTPUT) 
MessageChannel redeemingOutput(); 

@Input(FULFILLING_INPUT) 
MessageChannel fulfillingInput(); 

@Output(FULFILLING_OUTPUT) 
MessageChannel fulfillingOutput(); 

答えて

0

Channelsクラスは表示されませんが、バインダーはエラーチャンネルが「特別」であることを認識しません。

バインドを再試行して設定し、例外をデッドレターのトピックにルーティングすることができます。 1.0.0.RELEASEにあるthis PRを参照してください。

@MessageEndpoint 
public static class GatewayInvoker { 

    @Autowired 
    private ErrorHandlingGateway gw; 

    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT) 
    public void send(Message<?> message) { 
     this.gw.send(message); 
    } 

} 

@Bean 
public GatewayInvoker gate() { 
    return new GatewayInvoker(); 
} 

@MessagingGateway(defaultRequestChannel = "toService", errorChannel = Channels.ERRORS) 
public interface ErrorHandlingGateway { 

    void send(Message<?> message); 

} 

変更あなたのサービスの活性化因子の入力チャンネルtoServiceをする: -

また、あなたがサービスアクティベータの前に「ミッドフロー」のゲートウェイを追加することができますJavaで「のtry/catch」ブロックのようにそれを考えます。

あなたはフレームワークが@MessagingGatewayインタフェースを検出して、そのプロキシを構築することができますので、お使いのコンフィギュレーションクラスに@IntegrationComponentScanを追加する必要があります。

EDIT

ちょうど私に示唆した別の方法は、あなたのサービスの活性化因子のアドバイスチェーンでExpressionEvaluatingAdviceを追加することです。

+0

また、Spring Cloud Streamの将来のソリューションについては、https://github.com/spring-cloud/spring-cloud-stream/issues/538の進捗状況に従ってください。 –

+0

総合的な答えをありがとう、私はまだそれを消化しています。私はChannels.classを追加しました.Adviceベースのアプローチを採用することの提案と同様に、ステートフルなリトライ動作にも興味があります。私はあなたの答えのいくつかの点について少し混乱しています。あなたは[バインダーは再試行で構成でき、例外をデッドレターのトピックにルーティングすることができます。 1.0.0.RELEASEにあるこのPRを参照してください。]私は1.0.0.RC2を使用しています。これはPRよりも前のようです。 1.0.0.RELEASEはいつリリースされるのですか? –

+0

[先週リリースされました](https://spring.io/blog/2016/05/10/spring-cloud-stream-1-0-0-release-is-available) –

関連する問題