2017-01-14 11 views
0

タイトルはちょっとしたカンバッサーですが、それがうまくいかない場合はもちろん私のせいです。Camel Rest DSL - AggregationStrategy strange behavior

rdbmsからsolrおよびmongo dbへのデータ転送を実行したいとします。

  • 取得し、顧客IDが詳細
  • 取得し、顧客の請求書を
  • 取得custometrsを転送する
  • 取得し、顧客の支払い
を: は、私は(例えば)以下の手順を完了する必要があり、それを行うために、

次に、mongo dbとsolrに集約してインデックスを作成します。ここで

は私のコードですが、私はそれを動作させることはできません。

from("seda:initial-data-transfer") 
     .setProperty("recipientList", simple("direct:details,direct:invoices,direct:payments")) 
     .setProperty("afterAggregate", simple("direct:mongodb,direct:solr")) 
     .setBody(constant("{{query.initial-data-transfer.ids}}")) 
     .to(jdbc) 
     .process(new RowSetIdsProcessor()) 
     .split().tokenize(",", 1000) // ~200k ids - group by 1000 ids 
     .to("direct:customers-ids"); 

from("direct:customers-ids") 
     .recipientList(exchangeProperty("recipientList").tokenize(",")) 
     // ? .aggregationStrategy(new CustomerAggregationStrategy()).parallelProcessing() 
     .aggregate(header("CamelCorrelationId"), new CustomerAggregationStrategy())    
     .completionPredicate(new CustomerAggregationPredicate()) // true if details + invoices + payments, etc .... 
     // maybe a timeOut here ? 
     .process(businessDataServiceProcessor) 
     .recipientList(exchangeProperty("afterAggregate").tokenize(",")); 

from("direct:details") 
     .setHeader("query", constant("{{query.details}}")) 
     .bean(SqlTransform.class,"detailsQuery").to(jdbc) 
     .process(new DetailsProcessor()); 

from("direct:invoices") 
     .setHeader("query", constant("{{query.invoices}}")) 
     .bean(SqlTransform.class,"invoicessQuery").to(jdbc) 
     .process(new InvoicesProcessor()); 

私はAggregationStrategyがどのように動作するか理解していません。 時々、私は1000のIDSの2つのまたは3ブロックを実行し、DBとSolrのをMongoのために保存しますが、すべての交換がaggregationStrategyで空になった後ことができます...

私は考えているの多くを試してみました..しかし、各時間が経過すると、集計は失敗します。あなたの助けのための

おかげ

アップデート:ここで

がCustomerAggregationStrategyの一部です:

public class CustomerAggregationStrategy implements AggregationStrategy { 
    @Override 
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
     Message newIn = newExchange.getIn(); 

     CustomerDataCollector collector = null; 
     if (oldExchange == null) { 
      int completionSize = newExchange.getProperty("completionSize", Integer.class); 
      collector = new CustomerDataCollector(completionSize); 
      CollectData(collector, newIn, newExchange); 
      newIn.setBody(collector); 
      return newExchange; 
     } 
     collector = oldExchange.getIn().getBody(CustomerDataCollector.class); 
     CollectData(collector, newIn, newExchange); 
     return oldExchange; 
    } 

    private void CollectData(CustomerDataCollector collector, Message message, Exchange exchange) { 
     String recipientListEndpoint = (String)exchange.getProperty(Exchange.RECIPIENT_LIST_ENDPOINT); 
     switch (recipientListEndpoint){ 
      case "direct://details" : 
       collector.setDetails(message.getBody(Map.class)); 
       break; 
      case "direct://invoices" : 
       collector.setInvoices(message.getBody(Map.class)); 
       break; 
      case "direct://payments" : 
       collector.setPayments(message.getBody(Map.class)); 
       break; 
     } 
    } 
} 

アップデート:

私はCustomerAggregationStrategyでこれをログに記録することができます

String camelCorrelationId = (String)exchange.getProperty(Exchange.CORRELATION_ID); 

[t-AggregateTask] .i.c.a.CustomerAggregationStrategy : CustomerAggregationStrategy.CollectData : direct://details ID-UC-0172-50578-1484523575668-0-5 
[t-AggregateTask] .i.c.a.CustomerAggregationStrategy : CustomerAggregationStrategy.CollectData : direct://invoices ID-UC-0172-50578-1484523575668-0-5 
[t-AggregateTask] .i.c.a.CustomerAggregationStrategy : CustomerAggregationStrategy.CollectData : direct://payments ID-UC-0172-50578-1484523575668-0-5 

期待どおりのCamelCorrelationIdの値と同じです。 私はCamelCorrelationIdが正しいと思います。それじゃない?

+0

「CamelCorrelationId」ヘッダーはどのように設定しますか?相関式はあなたの交換の間に共通に持つことができなければなりません.CorrelationIdは常に異なっていると私は思っています。 –

+0

...CustomerAggregationStrategyクラスのコードを記述することができれば(あるいは少なくとも論理を記述することができれば)、あなたのルートに何が悪いのかを改善するのに役立ちます。 –

+0

私はCamelCorrelationIDをこれに従って設定しません:[リンク](http://camel.apache.org/correlation-identifier.html)_ "一部のEIPパターンはサブメッセージをスピンオフします。そのような場合、Camelたとえば、Splitter、Multicast、Recipient List、Wire TapなどのExchangeとリンクするExchange.CORRELATION_IDというキーを持つプロパティとしてExchangeに相関IDを追加します。 RecipientList EIPはこれを行います。私はいくつかのコードで質問を更新しました。ご協力いただきありがとうございます。 –

答えて

0

これでよかったです。

tokeniszerの後、私はプロパティCustomCorrelationIdを次のように設定しました。このように、この値に

.split().tokenize(",", 1000) 
    .setProperty("CustomCorrelationId",header("breadcrumbId")) 
    .to("direct:customers-ids") 

と集計:この作品の罰金は、現在、データが正しく集計され

from("direct:customers-ids") 
      .recipientList(exchangeProperty("recipientList").tokenize(",")) 

    from("direct:details") 
      .setHeader("query", constant("{{query.details}}")) 
      .bean(SqlTransform.class,"detailsQuery").to(jdbc) 
      .process(new DetailsProcessor()) 
      .to("direct:aggregate");  

...

from("direct:aggregate").routeId("aggregate") 
      .log("route : ${routeId}") 
      .aggregate(property("CustomCorrelationId"), new CustomAggregationStrategy()) 
      .completionPredicate(new CustomerAggregationPredicate()) 
      .process(businessDataServiceProcessor) 
      .recipientList(exchangeProperty("afterAggregate").tokenize(",")); 

。ご協力いただきありがとうございます。 あなたは道を指摘しました。