2016-10-29 2 views
0

私は4つの入出力操作を持っています:ABCおよびDです。それぞれはvertx.executeBlockingで実行する必要があります。どのように私は、この動作を達成することができますVert.xはvertx.executeBlockingから約束します

//PSEUDOCODE 
waitForExecuteBlocking(A_OPERATION); 
thenWaitForAllExecuteBlocking(`B_OPERATION`, `C_OPERATION`, `D_OPERATION`) 
/* do something */ 

:私は次の行動を持っている必要がありますか?

solutioinがVertx Rxに見つかりません。私の*_OPERATIONクラスをワーカーの頂点としてラップしたくない理由があります。

答えて

1

私は別の答えを追加しています、今回は先物です。
まず、Javaの未来ではなく、Vertx先物です。正しいインポートを使用してください。コードに今

// I'm running in main(), so everything is static, just for the sake of example 
private static Vertx vertx = Vertx.vertx(); 
public static void main(String[] args) throws InterruptedException { 


    long start = System.currentTimeMillis(); 

    // In your case it should be operationA(), operationB(), etc 
    // But I wanted to make the code shorter 
    CompositeFuture.all(operationA(), operationA(), operationA()).setHandler((r) -> { 
     if (r.succeeded()) { 
      // You can even iterate all the results 
      List<String> results = r.result().list(); 
      for (String result : results) { 
       System.out.println(result); 
      } 
      // This will still print max(operationA, operationB, operationC) 
      System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis"); 
     } 
     else { 
      System.out.println("Something went wrong"); 
     } 
    }); 
} 


// Return a future, then fulfill it after some time 
private static Future<String> operationA() { 
    Future<String> future = Future.future(); 

    long millis = 1000 + ThreadLocalRandom.current().nextInt(500); 
    vertx.setTimer(millis, (l) -> { 
     future.complete("All is good " + millis); 
    }); 

    return future; 
} 
1

私は答えを2つに分けます。 RxJavaに依存するのではなく、通常のJavaだけに依存します。
まず、より複雑な例に今A_OPERATION

Vertx vertx = Vertx.vertx(); 

    CountDownLatch latch = new CountDownLatch(1); 

    Long start = System.currentTimeMillis(); 

    vertx.deployVerticle(new AbstractVerticle() { 

     @Override 
     public void start() throws InterruptedException { 
      // Just to demonstrate 
      Thread.sleep(1000); 

      latch.countDown(); 
     } 
    }); 

    // Always use await with timeout 
    latch.await(2, TimeUnit.SECONDS); 

    System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis"); 

待つ:

public static void main(String[] args) throws InterruptedException { 

    Vertx vertx = Vertx.vertx(); 

    // This should be equal to number of operations to complete 
    CountDownLatch latch = new CountDownLatch(3); 

    Long start = System.currentTimeMillis(); 

    // Start your operations 
    vertx.deployVerticle(new BlockingVerticle(latch)); 
    vertx.deployVerticle(new BlockingVerticle(latch)); 
    vertx.deployVerticle(new BlockingVerticle(latch)); 

    // Always use await with timeout 
    latch.await(2, TimeUnit.SECONDS); 

    System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis"); 
} 


private static class BlockingVerticle extends AbstractVerticle { 

    private final CountDownLatch latch; 

    public BlockingVerticle(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    @Override 
    public void start() throws InterruptedException { 


     long millis = 1000 + ThreadLocalRandom.current().nextInt(500); 

     System.out.println("It will take me " + millis + " to complete"); 

     // Wait for some random time, but no longer that 1.5 seconds 
     Thread.sleep(millis); 

     latch.countDown(); 
    } 

} 

は、あなたがそのメインスレッドが最大(B_OPERATION、C_OPERATION、D_OPERATION)+少数のためにブロックされます注意してくださいミリ秒以上。

+0

が答えてくれてありがとう!しかし、それは同期のために余分な頂点を配置するのは私にとっては奇妙に思えます。私のアプリは '-ha'オプションで動作しているので、それらをデプロイしたくありません。つまり、各頂点のオーバヘッドはほとんどありません。 (私も嫌いな)解決策として 'vertx.executeBlocking'の中に' CountDownLatch'を渡すことができます。しかしvertxのdocsは、vertexはあなたのコードを同期コードから解放することになっており、それは間違った方法であると言います。 –

+0

まず、あなたは絶対に正しいです。私はあなたがブロックコードを望んでいたという特別な理由があったと思いました。先物に基づいて同じコードで別の回答を追加します。 –