2016-12-04 6 views
2

私は、Spring Integration DSLフローを使用して、残りのAPIからデータを取得し、変換して別の残りのAPIに送信します。Spring Integration Queueエラー処理

データがフェッチされた後、残りの処理を行うキューチャネルにメッセージが送信されます。キューが動作している間に、元のスレッドが移動してより多くのデータを取得します。

問題は、キューからスローされたエラーはすべてのデータの処理が完了するまで処理されないが、処理を停止してすぐにエラーをスローする必要があるためです。長い時間が、私はそれが最初に見つかったエラーで停止したい。

ゲートウェイ:

@MessagingGateway(errorChannel = "syncErrorChannel") 
@Service 
public interface CrmGateway { 
    @Gateway(requestChannel = "departmentSyncInput", replyChannel = "departmentSyncOutput") 
    @Payload("new String()") 
    Object syncDepartments(); 
} 

フロー:

/** 
    * Fetches data from the source api and passes it on to the split channel to process it If the 
    * response indicates it has more data to fetch then it is also loaded 
    * 
    * @return {@link IntegrationFlow} 
    */ 
    @Bean 
    IntegrationFlow sync() { 
    return IntegrationFlows 
     .from("departmentSyncInput") 
     .handle(this::fetchDomain) 
     .enrichHeaders(s -> s.headerExpressions(h -> h.put("nextLink", "payload.getNext()"))) 
     .routeToRecipients(r -> r 
     .recipient("departmentSplitChannel") 
     .recipient(
      "departmentSyncInput", 
      p -> p.getPayload() instanceof Wrapper 
      && ((Wrapper) p.getPayload()).getNext() != null 
     )) 
     .get(); 
    } 

    /** 
    * Split data from the api into individual models and send them to the target service 
    * 
    * @return {@link IntegrationFlow} 
    */ 
    @Bean 
    IntegrationFlow split() { 
    return IntegrationFlows 
     .from("departmentSplitChannel") 
     .transform(Wrapper.class, Wrapper::getContent) 
     .split() 
     .channel(c -> c.executor(Executors.newScheduledThreadPool(100))) 
     .enrichHeaders(h -> h.header("errorChannel", "syncErrorChannel")) 
     .handle((payload, headers) -> log("Syncing", payload, payload)) 
     .transform(Department.class, transformer) 
     // exception happens here 
     .handle(DepartmentDTO.class, (payload, headers) -> service.upsertDepartment(payload)) 
     .handle((payload, headers) -> log("Synced", payload, payload)) 
     .aggregate() 
     .get(); 
    } 

エラーハンドラ:

@Bean 
    IntegrationFlow errorHandler() { 
    return IntegrationFlows 
     .from("syncErrorChannel") 
     .handle(Exception.class, (payload, headers) -> { 
     payload.printStackTrace(); 
     return payload; 
     }) 
     .get(); 
    } 

私も同じ結果とIntegrationFlows.from("errorChannel")を使用してみました。

私もFutureを使用しようとしましたが、同じように動作するので、get()と呼んだときにエラーが発生しますが、これは最後に起こっています。

ありがとうございました。

答えて

0

フローにはqueueチャネル定義はありませんが、.channel(c -> c.executor())ということになります。あなたも同様にログを共有する方が良いでしょう。

ヘッダー(ゲートウェイの場合はTemporaryReplyChannel)をオーバーライドしようとします。

したがって、エラーはゲートウェイのプロセスに送信され、splitの場合はクラッシュします。

h.header("errorChannel", "syncErrorChannel", true)で実際にそのヘッダーを無効にすることをおすすめします。

+0

ありがとうございました。 'c - > c.executor()'をキューと違うものにしていますか?これはXMLから変換したDSLですが、以前はSI DSLを使用していませんでした。 –

+0

大きな高さから、それを処理する空きスレッドがない場合は、内部キューに格納されているエグゼキュータのタスクをフィードするので、実際にはキューになります。ポーリング固有の動作のためにQueueChannelとは何を話していますか –

関連する問題