2012-06-28 76 views
11

Java(java.util.concurrent)でスレッドプールを実装する必要があります。スレッドプールの数は、アイドル時に最小限の値になり、上限に達しますさらに)ジョブが実行を完了するより速くそれに投入され、すべてのジョブが完了し、それ以上ジョブがサブミットされないときに下限に縮小されます。動的(拡大/縮小)スレッドプールの作成

どのようにそのようなものを実装しますか?私はこれがかなり一般的な使用シナリオであると思いますが、明らかに多くのジョブが送信されたときに固定サイズのプールとプールを作成できるのは、java.util.concurrent.Executorsのファクトリメソッドだけです。 ThreadPoolExecutorクラスはcorePoolSizemaximumPoolSizeのパラメータを提供しますが、そのドキュメントはcorePoolSizeスレッド以上を同時に持つ唯一の方法は境界ジョブキューを使用することを暗示しているようですが、maximumPoolSizeスレッドに達した場合は、あなたはあなた自身に対処しなければならない雇用拒否を得るでしょうか?私はこれを思いついた:

//pool creation 
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS, 
    new ArrayBlockingQueue<Runnable>(minSize)); 
... 

//submitting jobs 
for (Runnable job : ...) { 
    while (true) { 
     try { 
      pool.submit(job); 
      System.out.println("Job " + job + ": submitted"); 
      break; 
     } catch (RejectedExecutionException e) { 
      // maxSize jobs executing concurrently atm.; re-submit new job after short wait 
      System.out.println("Job " + job + ": rejected..."); 
      try { 
       Thread.sleep(300); 
      } catch (InterruptedException e1) { 
      } 
     } 
    } 
} 

私は何か見落としていますか?これを行うより良い方法はありますか?また、要件に応じて、上記のコードが少なくとも完了するまで問題が生じる可能性があります。(total number of jobs) - maxSizeジョブが終了しました。だから、あなたがプールに任意の数のジョブを提出し、それらのいずれかが終了するのを待たずにすぐに進むことができるようにしたいのであれば、私はそれをどうやって行うことができるのか分かりません。発行されたすべてのジョブを保持するために必要な無制限のキューAFAICSでは、ThreadPoolExecutor自体に無制限のキューを使用している場合、そのスレッド数はcorePoolSizeを超えて増加することはありません。

+3

私は、動的サイズのスレッドプールの有用性を見ることはできません。アプリケーションの稼働時間中にボード上のプロセッサ数が変化しますか? – corsiKa

+3

'newCachedThreadPool'があなたの状況に適していないのはなぜですか?もはや使用されなくなったスレッドを自動的に無効にします。 – Tudor

+0

あなたのアイドルスレッドが死んでいない場合はどうなりますか?常に最大サイズの固定サイズのプールを持っていたとしますか?何が起こるのだろうか? –

答えて

4

同じスレッドを使用するRejectedExecutionHandlerを割り当てて、ブロックキューにジョブを送信するのに役立つトリックです。これは現在のスレッドをブロックし、何らかのループの必要性を取り除きます。

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

はここでその答えからコピー拒否ハンドラです:

は、ここに私の答えを参照してください。

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200); 
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads, 
     0L, TimeUnit.MILLISECONDS, queue); 
// by default (unfortunately) the ThreadPoolExecutor will call the rejected 
// handler when you submit the 201st job, to have it block you do: 
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
     // this will block if the queue is full 
     executor.getQueue().put(r); 
    } 
}); 

その後、限り、あなたはどのスレッドがコアスレッド上に作成される前に、あなたは最初を使用するバウンド形式のブロッキングキューがいっぱいになることを実感としてカウントコア/最大スレッドを利用することができるはずです。したがって、コアスレッドが10個あり、11番目のジョブを11番目のスレッドに開始させたい場合は、残念ながら(おそらくSynchronousQueue)サイズ0のブロッキングキューを持つ必要があります。私はこれが他の点では偉大なExecutorServiceクラスの本当の限界だと感じています。

1

セットmaximumPoolSizeからInteger.MAX_VALUEです。もしあなたが20億以上の糸を持っていれば...それでは、幸いです。とにかく

ThreadPoolExecutor状態のJavadoc:Integer.MAX_VALUEなど、本質的に無制限の値にmaximumPoolSizeを設定することにより

、あなたはプールが同時タスクの任意の数に対応することができます。最も一般的には、コアおよび最大プールサイズは構築時にのみ設定されますが、setCorePoolSize(int)およびsetMaximumPoolSize(int)を使用して動的に変更することもできます。

LinkedBlockingQueueのような無制限のタスクキューでは、これは任意の大きな容量を持つ必要があります。

+0

downvoterは説明してくれませんか? –

+0

ありがとうございますこれも参照してくださいhttp://stackoverflow.com/questions/28567238/threadpoolexecutor-does-not-shrink-properly/40384042#40384042他の質問で解決された他の問題を解消 – Vahid

+0

彼はそれを束縛したいと思っていました。 .. – Xerus

8

成長と収縮がスレッドと一緒になると、私の頭には1つの名前しかありません。CachedThreadPool from java.util.concurrent package。

ExecutorService executor = Executors.newCachedThreadPool(); 

CachedThreadPoolは()を必要なときに新しいスレッドを作成するスレッドだけでなく、を再利用することができます。 そして、スレッドが60秒間アイドル状態の場合、CachedThreadPoolはそれを強制終了します。これはかなり軽量です - あなたの言葉で成長し、収縮します!

+6

右ですが、それは限定されていません。 – Gray

+0

基礎となるThreadPoolExecutorとsetup maximumPoolSizeを手動で使用することも、実行時に設定することもできます –

関連する問題