2015-11-25 7 views
19

私はCompletableFutureをいくつか持っていて、それらを並行して実行したいのですが、最初に返すのはです。通常はです。CompletableFuture:最初に正常に戻るのを待っていますか?

は私が返すことが最初のを待つためにCompletableFuture.anyOfを使用することができます知っているが、これは通常または非常にを返します。私は例外を無視したい。

List<CompletableFuture<?>> futures = names.stream().map(
    (String name) -> 
    CompletableFuture.supplyAsync(
    () -> 
     // this calling may throw exceptions. 
     new Task(name).run() 
    ) 
).collect(Collectors.toList()); 
//FIXME Can not ignore exceptionally returned takes. 
Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture<?>[]{})); 
try { 
    logger.info(any.get().toString()); 
} catch (Exception e) { 
    e.printStackTrace(); 
} 

答えて

0

これは、フレームワークによってサポートされるべき方法です。最初に、私はCompletionStage.applyToEitherが何か類似していると思ったが、それはそれが分からない。だから私はこの解決策を考え出した:

public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) { 
    final int count = stages.size(); 
    if (count <= 0) { 
    throw new IllegalArgumentException("stages must not be empty"); 
    } 
    final AtomicInteger settled = new AtomicInteger(); 
    final CompletableFuture<U> future = new CompletableFuture<U>(); 
    BiConsumer<U, Throwable> consumer = (val, exc) -> { 
    if (exc == null) { 
     future.complete(val); 
    } else { 
     if (settled.incrementAndGet() >= count) { 
     // Complete with the last exception. You can aggregate all the exceptions if you wish. 
     future.completeExceptionally(exc); 
     } 
    } 
    }; 
    for (CompletionStage<U> item : stages) { 
    item.whenComplete(consumer); 
    } 
    return future; 
} 

ここでは、行動でそれを確認するには、いくつかの使い方です:

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.concurrent.CompletableFuture; 
import java.util.concurrent.CompletionStage; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.function.BiConsumer; 

public class Main { 
    public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) { 
    final int count = stages.size(); 
    if (count <= 0) { 
     throw new IllegalArgumentException("stages must not be empty"); 
    } 
    final AtomicInteger settled = new AtomicInteger(); 
    final CompletableFuture<U> future = new CompletableFuture<U>(); 
    BiConsumer<U, Throwable> consumer = (val, exc) -> { 
     if (exc == null) { 
     future.complete(val); 
     } else { 
     if (settled.incrementAndGet() >= count) { 
      // Complete with the last exception. You can aggregate all the exceptions if you wish. 
      future.completeExceptionally(exc); 
     } 
     } 
    }; 
    for (CompletionStage<U> item : stages) { 
     item.whenComplete(consumer); 
    } 
    return future; 
    } 

    private static final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor(); 

    public static <U> CompletionStage<U> delayed(final U value, long delay) { 
    CompletableFuture<U> future = new CompletableFuture<U>(); 
    worker.schedule(() -> { 
     future.complete(value); 
    }, delay, TimeUnit.MILLISECONDS); 
    return future; 
    } 
    public static <U> CompletionStage<U> delayedExceptionally(final Throwable value, long delay) { 
    CompletableFuture<U> future = new CompletableFuture<U>(); 
    worker.schedule(() -> { 
     future.completeExceptionally(value); 
    }, delay, TimeUnit.MILLISECONDS); 
    return future; 
    } 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
    System.out.println("Started..."); 

    /* 
    // Looks like applyToEither doesn't work as expected 
    CompletableFuture<Integer> a = CompletableFuture.completedFuture(99); 
    CompletableFuture<Integer> b = Main.<Integer>completedExceptionally(new Exception("Exc")).toCompletableFuture(); 
    System.out.println(b.applyToEither(a, x -> x).get()); // throws Exc 
    */ 

    try { 
     List<CompletionStage<Integer>> futures = new ArrayList<>(); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #1"), 100)); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #2"), 200)); 
     futures.add(delayed(1, 1000)); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #4"), 400)); 
     futures.add(delayed(2, 500)); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #5"), 600)); 
     Integer value = firstCompleted(futures).toCompletableFuture().get(); 
     System.out.println("Completed normally: " + value); 
    } catch (Exception ex) { 
     System.out.println("Completed exceptionally"); 
     ex.printStackTrace(); 
    } 

    try { 
     List<CompletionStage<Integer>> futures = new ArrayList<>(); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#1"), 400)); 
     futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#2"), 200)); 
     Integer value = firstCompleted(futures).toCompletableFuture().get(); 
     System.out.println("Completed normally: " + value); 
    } catch (Exception ex) { 
     System.out.println("Completed exceptionally"); 
     ex.printStackTrace(); 
    } 

    System.out.println("End..."); 
    } 

} 
7

あなたは、次のヘルパーメソッドを使用ことがあります

public static <T> 
    CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) { 

    CompletableFuture<T> f=new CompletableFuture<>(); 
    Consumer<T> complete=f::complete; 
    l.forEach(s -> s.thenAccept(complete)); 
    return f; 
} 

をこのように使用すると、以前の例外は無視されますが、最初に指定された値が返されます。

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(
     () -> { throw new RuntimeException("failing immediately"); } 
    ), 
    CompletableFuture.supplyAsync(
     () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)); 
      return "with 5s delay"; 
     }), 
    CompletableFuture.supplyAsync(
     () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10)); 
      return "with 10s delay"; 
     }) 
); 
CompletableFuture<String> c = anyOf(futures); 
logger.info(c.join()); 

この解決策の1つの欠点は、はありません。が完了した場合、すべて未完了であることです。成功した計算がある場合、最初の値を提供しますが、まったく成功した計算が存在しない場合、非常に失敗するソリューションは、もう少し複雑です:

public static <T> 
    CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) { 

    CompletableFuture<T> f=new CompletableFuture<>(); 
    Consumer<T> complete=f::complete; 
    CompletableFuture.allOf(
     l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new) 
    ).exceptionally(ex -> { f.completeExceptionally(ex); return null; }); 
    return f; 
} 

それはallOf年代その並外れハンドラ事実を利用しますすべての先物が完了した(例外的にまたはそうでない)と、将来が1回だけ完了できる(obtrude…のような特別なものを除いて)ことができた後にのみ呼び出されます。例外ハンドラが実行されると、結果がある未来を完了しようとする試みがあった場合、それを完了しようとする試みは正常に完了しなかった場合にのみ例外的に成功する。

それは最初のソリューションと全く同じ方法を使用し、すべての計算が失敗した場合のみ、異なる挙動を示す、例えばすることができます。

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(
     () -> { throw new RuntimeException("failing immediately"); } 
    ), 
    CompletableFuture.supplyAsync(
     // delayed to demonstrate that the solution will wait for all completions 
     // to ensure it doesn't miss a possible successful computation 
     () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)); 
      throw new RuntimeException("failing later"); } 
    ) 
); 
CompletableFuture<String> c = anyOf(futures); 
try { logger.info(c.join()); } 
catch(CompletionException ex) { logger.severe(ex.toString()); } 

の例では、上記のソリューションは、すべての補完のためにお待ちしておりますことを実証遅延を使用しています成功していない場合はthis example on ideoneと表示されます。結果のアイデアキャッシングにより、遅延に気付かないことがあります。

すべての先物が失敗した場合、どの例外が報告されるかについての保証はないことに注意してください。誤ったケースではすべての補完を待つので、最終的な結果にすることができます。それを考えると

+0

私たち[チャットでこのディスカッションを続行](http://chat.stackoverflow.com/rooms/97948/discussion-between-basilevs-and-holger)。 – Basilevs

+0

@Basilevs:回答を拡大しました – Holger

4

:ジャワの哲学の基礎の

  1. 一つは悪いプログラミングプラクティスを防止または阻止するためです。

    (それはそうすることに成功してきたどの程度は別の議論の対象である;。ポイントは、やはり、これは紛れもなく、言語の主要な目的の一つとなっていることを表します)

  2. 例外を無視します非常に悪い習慣です。

    例外は常に上の層のいずれか再スローなければならない、またはは、を処理し、または報告非常に少なくとも。具体的には、を静かに飲み込まないでください。

  3. エラーはできるだけ早く報告する必要があります。例えば

    、ランタイムはを反復処理しながら、コレクションが変更された場合にConcurrentModificationExceptionを投げる速い反復子を失敗提供するために経由する痛みを参照してください。非常に完成CompletableFutureを無視

  4. a)は、あなたができるだけ早い時点でエラーを報告していない、およびb)あなたがすべてでそれを報告しないようにそう計画であることを意味します。

  5. 例外的に完了したアイテムをリストから削除することができるため、最初の非例外的な完了を単純に待たずに、例外的な完了を気にする必要はありません。同時に、失敗を報告することを忘れないで、)と待ち時間を繰り返す。

求め-のための機能が意図的は、Javaから欠落している、と私はそれが合法的に不足しているであると主張していることをいとわない場合、私は、したがって、驚くことではないでしょう。

(申し訳ありませんソティリオス、何の標準的な答えはありません。)

+0

代替情報源(ホットスワップ可能なバックアップや負荷分散クラスタなど)を検討してください。ソースが交換可能であり、時には失敗することが知られており、応答に多くの時間を要する場合、いくつかのエラーを無視することは完全に合法であり望ましい。 – Basilevs

+0

@Basilevs trueですが、それでもログに記録し、ログメッセージを無視するのが最善です。記録がないどんな種類の失敗も良い考えではありません。 –

-2

この作品はウィル?正常に完了したすべての先物のストリームを返し、そのうちの1つを返します。

futures.stream() 
    .filter(f -> { 
    try{ 
     f.get(); 
     return true; 
    }catch(ExecutionException | InterruptedException e){ 
     return false; 
    } 
    }) 
    .findAny(); 
関連する問題