2012-04-23 9 views
1

私はラクダでいくつかの小さなプロジェクトを行ったことがありますが、私が理解しようとしているのは、大きなデータ(メモリに収まらない)ラクダルートで消費するとき。Apache Camel:JMSエンドポイントにルーティングされたデータベースからのGBのデータ

私は、私がラクダを使ってルーティングしたいと思う数GBのデータを含むデータベースを持っています。明らかに、すべてのデータをメモリに読み込むことはオプションではありません。

私はこれをスタンドアロンのアプリケーションとして実行していた場合、データをページングしてチャンクをJMS enpointに送信するコードを持っていました。私はそれが良いパターンを提供するようにラクダを使用したいと思います。私がファイルから消費していたら、私はstreaming()コールを使うことができました。

また、camel-sql/camel-jdbc/camel-jpaを使用するか、Beanを使用してデータベースから読み込む必要があります。

誰もが私と一緒にいることを願っています。私はJava DSLをよく知っていますが、人々が提供できる助言や提案は感謝しています。

更新:2-MAY-2012

だから私はこれで遊ぶためにいくつかの時間を持っていたし、私は実際に私が使用できるように、プロデューサーの概念を乱用されてやっていると思いますそれはルートにあります。

public class MyCustomRouteBuilder extends RouteBuilder { 

    public void configure(){ 
     from("timer:foo?period=60s").to("mycustomcomponent:TEST"); 

     from("direct:msg").process(new Processor() { 
       public void process(Exchange ex) throws Exception{ 
        System.out.println("Receiving value" : + ex.getIn().getBody()); 
       } 
     } 
    } 

} 

私のプロデューサーは次のようになります。明確にするために、私はCustomEndpointまたはCustomComponentを含めていません。薄いラッパーのようです。

public class MyCustomProducer extends DefaultProducer{ 

    Endpoint e; 
    CamelContext c; 

    public MyCustomProducer(Endpoint epoint){ 
      super(endpoint) 
      this.e = epoint; 
      this.c = e.getCamelContext(); 
    } 

    public void process(Exchange ex) throws Exceptions{ 

     Endpoint directEndpoint = c.getEndpoint("direct:msg"); 
     ProducerTemplate t = new DefaultProducerTemplate(c); 

     // Simulate streaming operation/chunking of BIG data. 
     for (int i=0; i <20 ; i++){ 
      t.start(); 
      String s ="Value " + i ;     
      t.sendBody(directEndpoint, value) 
      t.stop();   
     } 
    } 
} 

まず、上記はあまりきれいに見えません。これを実行する最もクリーンな方法は、私のラクダルートが消費するスケジューリングされたquartzジョブを介してjmsキュー(direct:msgの代わりに)を投入して、私のラクダで受け取ったメッセージサイズよりも柔軟にすることができるようですパイプライン。しかし、私はルートの一環として時間ベースのアクティベーションを設定するという意味を非常に気に入っていました。

これを行う最善の方法については、誰も考えていませんか?私の理解で

答えて

0

、すべてを行う必要がある:

from("jpa:SomeEntity" + 
    "?consumer.query=select e from SomeEntity e where e.processed = false" + 
    "&maximumResults=150" + 
    "&consumeDelete=false") 
.to("jms:queue:entities"); 

maximumResultsは、クエリごとに取得する方法多くの企業の限界を定義します。

エンティティインスタンスの処理が終了したら、e.processed = true;persist()を設定して、エンティティが再び処理されないようにする必要があります。そうする

一つの方法は、@Consumedアノテーションである:

class SomeEntity { 
    @Consumed 
    public void markAsProcessed() { 
     setProcessed(true); 
    } 
} 

もう一つは、あなたがキューに送信する前に、エンティティをシリアル化する方法であると注意する必要があります。 fromとtoの間にがさらに豊富なを使用する必要があります。

+0

アクションブックでラクラクを見た後、私はこれを前もって与えるか、カスタムコンポーネントを使用してより多くのコントロールを得ることができると思います。私はカスタムコンバータ/変換を使用しているので、変換は問題になるはずです。 – K2J

+0

私が有用だと分かったのは、エンティティのプライマリキーだけをシリアル化し、実際のエンティティを取得するための処理ロジックを残すことです。したがって、複雑なカスタム変換を行う必要はありません。最後に、エンティティはそのキーによって区別されます。 –

+0

Apache Nifiを使用してみることもできます。これは、この使用例に適しているようです。 – Pushkin

関連する問題