タイトルはちょっとしたカンバッサーですが、それがうまくいかない場合はもちろん私のせいです。Camel Rest DSL - AggregationStrategy strange behavior
rdbmsからsolrおよびmongo dbへのデータ転送を実行したいとします。
- 取得し、顧客IDが詳細
- 取得し、顧客の請求書を
- 取得custometrsを転送する
- 取得し、顧客の支払い
次に、mongo dbとsolrに集約してインデックスを作成します。ここで
は私のコードですが、私はそれを動作させることはできません。
from("seda:initial-data-transfer")
.setProperty("recipientList", simple("direct:details,direct:invoices,direct:payments"))
.setProperty("afterAggregate", simple("direct:mongodb,direct:solr"))
.setBody(constant("{{query.initial-data-transfer.ids}}"))
.to(jdbc)
.process(new RowSetIdsProcessor())
.split().tokenize(",", 1000) // ~200k ids - group by 1000 ids
.to("direct:customers-ids");
from("direct:customers-ids")
.recipientList(exchangeProperty("recipientList").tokenize(","))
// ? .aggregationStrategy(new CustomerAggregationStrategy()).parallelProcessing()
.aggregate(header("CamelCorrelationId"), new CustomerAggregationStrategy())
.completionPredicate(new CustomerAggregationPredicate()) // true if details + invoices + payments, etc ....
// maybe a timeOut here ?
.process(businessDataServiceProcessor)
.recipientList(exchangeProperty("afterAggregate").tokenize(","));
from("direct:details")
.setHeader("query", constant("{{query.details}}"))
.bean(SqlTransform.class,"detailsQuery").to(jdbc)
.process(new DetailsProcessor());
from("direct:invoices")
.setHeader("query", constant("{{query.invoices}}"))
.bean(SqlTransform.class,"invoicessQuery").to(jdbc)
.process(new InvoicesProcessor());
私はAggregationStrategyがどのように動作するか理解していません。 時々、私は1000のIDSの2つのまたは3ブロックを実行し、DBとSolrのをMongoのために保存しますが、すべての交換がaggregationStrategyで空になった後ことができます...
私は考えているの多くを試してみました..しかし、各時間が経過すると、集計は失敗します。あなたの助けのための
おかげ
アップデート:ここで
がCustomerAggregationStrategyの一部です:
public class CustomerAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
CustomerDataCollector collector = null;
if (oldExchange == null) {
int completionSize = newExchange.getProperty("completionSize", Integer.class);
collector = new CustomerDataCollector(completionSize);
CollectData(collector, newIn, newExchange);
newIn.setBody(collector);
return newExchange;
}
collector = oldExchange.getIn().getBody(CustomerDataCollector.class);
CollectData(collector, newIn, newExchange);
return oldExchange;
}
private void CollectData(CustomerDataCollector collector, Message message, Exchange exchange) {
String recipientListEndpoint = (String)exchange.getProperty(Exchange.RECIPIENT_LIST_ENDPOINT);
switch (recipientListEndpoint){
case "direct://details" :
collector.setDetails(message.getBody(Map.class));
break;
case "direct://invoices" :
collector.setInvoices(message.getBody(Map.class));
break;
case "direct://payments" :
collector.setPayments(message.getBody(Map.class));
break;
}
}
}
アップデート:
私はCustomerAggregationStrategyでこれをログに記録することができます
String camelCorrelationId = (String)exchange.getProperty(Exchange.CORRELATION_ID);
[t-AggregateTask] .i.c.a.CustomerAggregationStrategy : CustomerAggregationStrategy.CollectData : direct://details ID-UC-0172-50578-1484523575668-0-5
[t-AggregateTask] .i.c.a.CustomerAggregationStrategy : CustomerAggregationStrategy.CollectData : direct://invoices ID-UC-0172-50578-1484523575668-0-5
[t-AggregateTask] .i.c.a.CustomerAggregationStrategy : CustomerAggregationStrategy.CollectData : direct://payments ID-UC-0172-50578-1484523575668-0-5
期待どおりのCamelCorrelationIdの値と同じです。 私はCamelCorrelationIdが正しいと思います。それじゃない?
「CamelCorrelationId」ヘッダーはどのように設定しますか?相関式はあなたの交換の間に共通に持つことができなければなりません.CorrelationIdは常に異なっていると私は思っています。 –
...CustomerAggregationStrategyクラスのコードを記述することができれば(あるいは少なくとも論理を記述することができれば)、あなたのルートに何が悪いのかを改善するのに役立ちます。 –
私はCamelCorrelationIDをこれに従って設定しません:[リンク](http://camel.apache.org/correlation-identifier.html)_ "一部のEIPパターンはサブメッセージをスピンオフします。そのような場合、Camelたとえば、Splitter、Multicast、Recipient List、Wire TapなどのExchangeとリンクするExchange.CORRELATION_IDというキーを持つプロパティとしてExchangeに相関IDを追加します。 RecipientList EIPはこれを行います。私はいくつかのコードで質問を更新しました。ご協力いただきありがとうございます。 –