2016-05-23 16 views
1

1000のタスクのプールを起動して(最大4つは並列に実行できる)、3秒以上(個別に)タイムアウトすると自動的にタイムアウトする方法?Javaタイムアウト複数のタスクを並列に実行する

ExecutorServiceが役立つようですが(下の別の記事のSSCEを参照)、複数のタスクを並列で実行する方法はわかりません。future.get(3, TimeUnit.SECONDS)は同じスレッドで実行されています1つはタスクを起動するため、複数のタスクを並行して起動する機会はありません)。

import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 

public class Test { 
    public static void main(String[] args) throws Exception { 
     ExecutorService executor = Executors.newSingleThreadExecutor(); 
     Future<String> future = executor.submit(new Task()); 

     try { 
      System.out.println("Started.."); 
      System.out.println(future.get(3, TimeUnit.SECONDS)); 
      System.out.println("Finished!"); 
     } catch (TimeoutException e) { 
      future.cancel(true); 
      System.out.println("Terminated!"); 
     } 

     executor.shutdownNow(); 
    } 
} 

class Task implements Callable<String> { 
    @Override 
    public String call() throws Exception { 
     Thread.sleep(4000); // Just to demo a long running task of 4 seconds. 
     return "Ready!"; 
    } 
} 

ありがとう!

+0

「Executors.newSingleThreadExecutor()」は使用しないでください。「Executors.newFixedThreadPool(4)」を使用してください。 'Future.get()'の呼び出しを開始する前に、それらをすべて送信してください。 –

答えて

0

あなたはそれがタイムアウト時間を超えた場合に、それを殺すために、各タスクを監視する必要があり、どちらか

  1. 場合はタスク自身が作成する必要があり、時間を追跡し、適切に終了したか、
  2. ウォッチドッグスレッドすべてタスクです。ウォッチドッグスレッドはタイマーを設定し、スリープします。タイムアウト間隔が経過するとスリープ状態に戻り、タスクがまだ実行中の場合は終了します。
+0

オプション1は、外部依存関係のためにスレッドがフリーズする場合のタイムアウトと同様に実行できません。オプション2の最良のアプローチは何でしょうか? – Tom

+0

私はオプション2をより良く説明する方法がわかりません。タスクをサブミットするときには、そのジョブがタイムアウト期間中スリープ状態になり、起床し、プライマリ・タスクがまだ実行中の場合は終了させるもう1つのスレッド(別のエグゼキュータで)を作成する必要があります。 –

+0

ここでは、TimerとTimerTaskを使用するのが最善であると分かりました。しかし、私が不明なところでは、Thread.stop()が推奨されなくなったように、スレッドをどのように削除するかが分かります。 – Tom

-1

これはトリッキーなものです。私はそれはロックと条件の代わりに、同期を使用して、よりきれいに行うことができる疑い

public class TaskQueue<T> { 
    private static final Logger logger = 
     Logger.getLogger(TaskQueue.class.getName()); 

    private final Collection<Callable<T>> tasks; 

    private final int maxTasks; 

    private int addsPending; 

    private final Collection<T> results = new ArrayList<T>(); 

    private final ScheduledExecutorService executor; 

    public TaskQueue() { 
     this(4); 
    } 

    public TaskQueue(int maxSimultaneousTasks) { 
     maxTasks = maxSimultaneousTasks; 
     tasks = new ArrayDeque<>(maxTasks); 
     executor = Executors.newScheduledThreadPool(maxTasks * 3); 
    } 

    private void addWhenAllowed(Callable<T> task) 
    throws InterruptedException, 
      ExecutionException { 

     synchronized (tasks) { 
      while (tasks.size() >= maxTasks) { 
       tasks.wait(); 
      } 
      tasks.add(task); 

      if (--addsPending <= 0) { 
       tasks.notifyAll(); 
      } 
     } 

     Future<T> future = executor.submit(task); 
     executor.schedule(() -> future.cancel(true), 3, TimeUnit.SECONDS); 
     try { 
      T result = future.get(); 
      synchronized (tasks) { 
       results.add(result); 
      } 
     } catch (CancellationException e) { 
      logger.log(Level.FINE, "Canceled", e); 
     } finally { 
      synchronized (tasks) { 
       tasks.remove(task); 
       if (tasks.isEmpty()) { 
        tasks.notifyAll(); 
       } 
      } 
     } 
    } 

    public void add(Callable<T> task) { 
     synchronized (tasks) { 
      addsPending++; 
     } 

     executor.submit(new Callable<Void>() { 
      @Override 
      public Void call() 
      throws InterruptedException, 
        ExecutionException { 
       addWhenAllowed(task); 
       return null; 
      } 
     }); 
    } 

    public Collection<T> getAllResults() 
    throws InterruptedException { 
     synchronized (tasks) { 
      while (addsPending > 0 || !tasks.isEmpty()) { 
       tasks.wait(); 
      } 
      return new ArrayList<T>(results); 
     } 
    } 

    public void shutdown() { 
     executor.shutdown(); 
    } 
} 

:ここに私が思い付いたものです。

関連する問題