2016-12-30 18 views
2

私は私が持っているこれまでのところCompletableFutureStream を使用して、同時にいくつかのデータ量を処理しようとしている:Javaの8 CompletableFuture、ストリームとタイムアウト

public static void main(String[] args) throws InterruptedException, ExecutionException { 
    System.out.println("start"); 

    List<String> collect = Stream.of("1", "2", "3", "4", "5", 
      "6", "7") 
      .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x))) 
      .collect(Collectors.toList()) 
      .stream() 
      .map(CompletableFuture::join) 
      .collect(Collectors.toList()); 
    System.out.println("stop out!"); 
} 


public static Supplier<String> getStringSupplier(String text) { 
    return() -> { 

     System.out.println("start " + text); 
     try { 
      TimeUnit.SECONDS.sleep(2); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     System.out.println("stop " + text); 
     return "asd" + text; 
    }; 
} 

と出力罰金です:

start start 1 start 4 start 3 start 2 start 5 start 6 start 7 stop 4 stop 1 stop 5 stop 2 stop 6 stop 3 stop 7 stop out!

しかし、今私はそのジョブにタイムアウトを追加したいと思います。 1秒後にキャンセルする必要があります。 nullまたはその他の値をcollectリストに返します。 (私は原因を示す値を好むだろう)。

どうすれば実現できますか?

ありがとうございました。

+2

を導入しましたcompleteOnTimeout方法を、使用することができます(http ://download.java.net/java/jdk9/docs/api/java/util/concurrent/CompletableFuture.html#completeOnTimeout-T-long-java.util.concurrent.TimeUnit-) – eee

+0

これまでのところ(おそらくしばらくの間)私の会社はjava 8と私はそれを使用することはできませんので、私は使用できません:) – user2377971

答えて

2

それを行う方法を私が発見した:

private static final ScheduledExecutorService scheduler = 
     Executors.newScheduledThreadPool(
       1, 
       new ThreadFactoryBuilder() 
         .setDaemon(true) 
         .setNameFormat("failAfter-%d") 
         .build()); 

public static void main(String[] args) throws InterruptedException, ExecutionException { 
    System.out.println("start"); 
    final CompletableFuture<Object> oneSecondTimeout = failAfter(Duration.ofSeconds(1)) 
      .exceptionally(xxx -> "timeout exception"); 
    List<Object> collect = Stream.of("1", "2", "3", "4", "5", "6", "7") 
      .map(x -> CompletableFuture.anyOf(createTaskSupplier(x) 
        , oneSecondTimeout)) 
      .collect(Collectors.toList()) 
      .stream() 
      .map(CompletableFuture::join) 
      .collect(Collectors.toList()); 
    System.out.println("stop out!"); 
    System.out.println(collect); 
} 

public static CompletableFuture<String> createTaskSupplier(String x) { 
    return CompletableFuture.supplyAsync(getStringSupplier(x)) 
      .exceptionally(xx -> "PROCESSING ERROR : " + xx.getMessage()); 
} 


public static Supplier<String> getStringSupplier(String text) { 
    return() -> { 

     System.out.println("start " + text); 
     try { 
      TimeUnit.MILLISECONDS.sleep(100); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     if (text.equals("1")) { 
      throw new RuntimeException("LOGIC ERROR"); 
     } 
     try { 
      if (text.equals("7")) 
       TimeUnit.SECONDS.sleep(2); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     System.out.println("stop " + text); 
     return "result " + text; 
    }; 
} 

public static <T> CompletableFuture<T> failAfter(Duration duration) { 
    final CompletableFuture<T> promise = new CompletableFuture<>(); 
    scheduler.schedule(() -> { 
     final TimeoutException ex = new TimeoutException("Timeout after " + duration); 
     return promise.completeExceptionally(ex); 
    }, duration.toMillis(), MILLISECONDS); 
    return promise; 
} 

それが返されます。

start 
start 1 
start 3 
start 4 
start 2 
start 5 
start 6 
start 7 
stop 6 
stop 4 
stop 3 
stop 5 
stop 2 
stop out! 
[PROCESSING ERROR : java.lang.RuntimeException: LOGIC ERROR, result 2, result 3, result 4, result 5, result 6, timeout exception]` 

をあなたはそれについてどう思いますか、あなたはその解決策のいずれかの欠陥を見つけることができますか? Javaの8に限定されるものではなく、他の人については、

0

別のCompletableFutureでジョブをラップすることができます。指定した時間を超過すると、TimeoutExceptionが発生します。特別に処理する場合は、TimeoutException catchブロックを分離できます。

List<String> collect = null; 
    try { 
     collect = CompletableFuture.supplyAsync(() -> 
       Stream.of("1", "2", "3", "4", "5", 
         "6", "7") 
         .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x))) 
         .collect(Collectors.toList()) 
         .stream() 
         .map(CompletableFuture::join) 
         .collect(Collectors.toList()) 
     ).get(5, TimeUnit.SECONDS); 
    } catch (InterruptedException | ExecutionException | TimeoutException e) { 
     e.printStackTrace(); 
     //separate out the TimeoutException if you want to handle it differently 

    } 

    System.out.println(collect); //would be null in case of any exception 
+0

その解決策の問題は、 'collect'リストが' null'なので、sucessfullの実行からデータを失っているので、 – user2377971

0

あなたは(CompletableFuture.supplyAsync(getStringSupplier(x)は、timeoutExecutorService))エグゼキュータのパラメータでCompletableFutureのオーバーロードさsupplyAsync方法を試すことができますし、timeoutExecutorServiceためlinkを参照することができます。

+0

あなたは正しいです働くことができます。しかし、その解決法は 'CompletableFuture'の代わりに' Feature'を使います。私は 'CompletableFuture'オブジェクトを使うことを好みます。なぜなら、私は新しいものを学びたいからです。 – user2377971

1

、あなたは `CompletableFuture#completeOnTimeout`で簡単にこれを行うことができますJavaの9でJava 9で

List<String> collect = Stream.of("1", "2", "3", "4", "5", "6", "7") 
     .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x)) 
       .completeOnTimeout(null , 1, SECONDS)) 
     .filter(Objects::nonNull) 
     .collect(toList()) 
     .stream() 
     .map(CompletableFuture::join) 
     .collect(toList()); 
関連する問題