2012-10-08 45 views
5

Javaはバッチ処理を処理するキューオブジェクトまたはメカニズムをサポートしていますか?バッチ処理を処理するJavaキューオブジェクトまたはメカニズムがありますか?

例:私たちは待ち行列(または任意の待ち行列オブジェクト)を持っていますが、プロデューサは待ち行列に1つずつ項目をプッシュします。私の目標は、この項目に10個の項目または10個以上の項目があるときです。ハンドラを1つのバッチで処理します。

自動的にトリガされないため、ハンドラ側でキューを正常にループさせる方法を見つける必要があります。

これを処理するのに典型的で高性能のオブジェクトまたはライブラリがありますか?

おかげで、 エムレ

答えて

0

は、いくつかの実装を持つインタフェースjava.util.QueueのAPIドキュメント、を見てください。

また、さまざまなプロセス間でメッセージを交換するためのキューイングシステムを扱う標準APIのJava Message Service (JMS)もあります。

+1

これはどのようにバッチ処理の問題に対処していますか?私はそうは思わない。 –

0

私はCountDownLatchが必要なものか、おそらくCyclicBarrierだと思います。これにより、一定数の操作が発生した後にコンシューマをトリガする同期ポイントを設定でき、標準キューをコンテナオブジェクトとして使用できます。

+0

あなたはいくつかのコードスニペットを提供してください、思考を理解するのは良いでしょう。 –

+1

'CountDownLatch'は、いくつかのスレッドを生成しているか、または複数のスレッドを消費している(またはその両方)場合に特に便利です。それがここにあるのは明らかではない。 –

2

待ち行列でのバッチ処理は、待機/通知によって達成できる可能性があります。使用可能かどうかに関わらず、リソースに対するスレッドコールをブロックするようなものです。

public class MyQueue implements Queue<Object>{ 
     public synchronized List<Object> peek() { 
     if(this.list.size()>=10) 
        this.list.wait(); 
     return Collections.subList(0,10); 
    } 
     @Override 
    public boolean add(Object e) { 
     this.list.add(e); 
       if(this.list.size()>=10) 
        this.list.notifyAll(); 
     return false; 
    } 
} 

それはあなたが指定した時間外で待って呼び出すことができます自動的に

その場合にトリガされません。

2

BlockingQueue.drainTo()を使用すると、実行するタスクのバッチを自動的に取得できます。これは毎秒100Kを超えるタスクに適しています。

より高いパフォーマンスのキューイングが必要な場合は、より複雑なDisruptorまたはJava Chronicleを使用して、1秒間に何百万ものタスクにキューイングすることができます。ここで

+0

つまり、メソッド側で実装する必要があります。 BlockingQueue.drainTo()を実行するループを追加して、キュー項目を含むリストを取得し、プロセッサを呼び出して処理します。 –

+0

あなたはそれを行うことができます。生産者と消費者の両方に方法があります;) –

1

は、収集するために、バックグラウンドスレッドを使用して、バッチで処理オブジェクトを簡単に試みだとプロセスオブジェクトは、他のスレッドによってキューにプッシュ:

public abstract class Batcher<E> implements Runnable { 

    public static interface BatchProcessor<E> { 
     public void processBatch(List<E> batch); 
    } 

    private final BlockingQueue<E> queue; 
    private final BatchProcessor<E> processor; 

    private Batcher(BlockingQueue<E> queue, BatchProcessor<E> processor) { 
     this.queue = queue; 
     this.processor = processor; 
    } 

    @Override 
    public void run() { 
     try { 
      while (true) { 
       List<E> batch = new ArrayList<E>(); 
       for (int i = 0; i < 10; i++) { 
        batch.add(queue.take()); 
       } 
       processor.processBatch(batch); 
      } 
     } catch (InterruptedException e) { 
      return; 
     } 
    } 

} 

これを使用するには、BlockingQueueを作成し、上のオブジェクトを置きますバッチを処理するためのBatchProcessorのインプリメンテーションのインスタンスを作成し、次にオブジェクトを前者から後者にポンピングするBatcherのインスタンスを作成します。

関連する問題