2016-04-03 18 views
1

boundedExecutorを作成したいが、固定数のスレッドのみを並列実行できる。より多くのタスクが追加されると、エグゼキュータは他のスレッドが完了するまでブロックします。executorService throw SynchronousQueueを使用したRejectException

ここに私は他の質問で見つけた実行者です。

public class BoundedExecutor extends ThreadPoolExecutor { 

    private final Logger logger = LogManager.getLogger(BoundedExecutor.class); 
    private final Semaphore semaphore; 

    public BoundedExecutor(int bound){ 
     super(bound, bound, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); 
     this.semaphore = new Semaphore(bound); 
    } 

    @Override 
    public void execute(Runnable task) { 
     try { 
      semaphore.acquire(); 
      super.execute(task); 
     } catch (InterruptedException e) { 
      logger.error("interruptedException while acquiring semaphore"); 
     } 
    } 

    protected void afterExecute(final Runnable task, final Throwable t){ 
     super.afterExecute(task, t); 
     semaphore.release(); 
    } 
} 

と私はコードがシングルスレッドを作り、順次タスクを実行しますが、実際には、ときに最初のタスクの完全な、エグゼキュータはjava.util.concurrent.RejectedExecutionExceptionスロー思っ

public static void main(String[] args) throws Exception { 

     Runnable task =() -> { 
      try { 
       Thread.sleep(1000); 
       System.out.println(Thread.currentThread().getName() + " complete."); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     }; 
     BoundedExecutor pool = new BoundedExecutor(1); 
     for(int i = 0; i < 10; i++){ 
      pool.execute(task); 
     } 
     pool.shutdown(); 
    } 

メインコード。

私が理解したように、semaphore.acquire()は最初のタスクが完了してセマフォをリリースするまでスレッドをブロックしますが、コードに何が問題なのですか?

+0

なぜあなたはSynchronousQueueでセマフォを使用しているセマフォを使用してキューのブロックを作っていないでしょうか?これで何を達成しようとしていますか? –

+0

セマフォは、プールが一杯になったときにThreadExecutor add newタスクをブロックするために使用されます。 プールが一杯になったときにタスクをキューに入れたくないので、synchronousQueueが使用されます。 – iceshi

+0

SynchronousQueueを使用すると、フリースレッドがないときはいつでもブロックされます。セマフォはこれに何を追加しますか? –

答えて

2

私は

public static void main(String[] args) { 
    SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>() { 
     @Override 
     public boolean offer(Runnable runnable) { 
      try { 
       return super.offer(runnable, 1, TimeUnit.MINUTES); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       return false; 
      } 
     } 
    }; 
    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, queue); 
    for (int i = 0; i < 10; i++) { 
     final int finalI = i; 
     pool.execute(() -> { 
      try { 
       Thread.sleep(1000); 
       System.out.println(LocalTime.now() + " - " + Thread.currentThread().getName() + " " + finalI + " complete"); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     }); 
    } 
    pool.shutdown(); 
} 

プリント

13:24:14.241 - pool-1-thread-1 0 complete 
13:24:15.247 - pool-1-thread-1 1 complete 
13:24:16.247 - pool-1-thread-1 2 complete 
13:24:17.247 - pool-1-thread-1 3 complete 
13:24:18.248 - pool-1-thread-1 4 complete 
13:24:19.248 - pool-1-thread-1 5 complete 
13:24:20.248 - pool-1-thread-1 6 complete 
13:24:21.248 - pool-1-thread-1 7 complete 
13:24:22.249 - pool-1-thread-1 8 complete 
13:24:23.249 - pool-1-thread-1 9 complete 
+0

注意: 'corePoolSize'以下のワーカースレッドの数 – louxiu

関連する問題