2016-02-18 3 views
5

Rx Vert.Xでチェーンされた要求を終了する方法は?RxJava Vert.xで連鎖HTTPリクエストを終了するには?

HttpClient client = Vertx.vertx().createHttpClient(); 
     HttpClientRequest request = client.request(HttpMethod.POST, 
       "someURL") 
       .putHeader("content-type", "application/x-www-form-urlencoded") 
       .putHeader("content-length", Integer.toString(jsonData.length())).write(jsonData); 
     request.toObservable(). 
       //flatmap HttpClientResponse -> Observable<Buffer> 
         flatMap(httpClientResponse -> { //something 
        return httpClientResponse.toObservable(); 
       }). 
         map(buffer -> {return buffer.toString()}). 
       //flatmap data -> Observable<HttpClientResponse> 
         flatMap(postData -> client.request(HttpMethod.POST, 
         someURL") 
         .putHeader("content-type", "application/x-www-form-urlencoded") 
         .putHeader("content-length", Integer.toString(postData.length())).write(postData).toObservable()). 
       //flatmap HttpClientResponse -> Observable<Buffer> 
         flatMap(httpClientResponse -> { 
        return httpClientResponse.toObservable(); 
       })......//other operators 
request.end(); 

ご存知のとおりです。最上位リクエストの場合はend()です。どのように私は内部の要求を終了するのですか? flatmap?私はそれを終了する必要がありますか?

答えて

1

次のコードのようにすることができます。

Vertxクライアントで取得したHttpClientRequestを直接使用しないことをお勧めします。代わりに、最初のサブスクリプションが受信されるとすぐにend()を呼び出す別のフローを作成します。

たとえば、ペアカスタムメソッド(この場合はrequest1()およびrequest2())でリクエストを取得できます。両方ともdoOnSubscribe()を使用して、必要なend()をトリガーします。 Read its description on the ReactiveX page

このexamleはVERTXreactivexを使用して、私はあなたがこの設定を使用ことを願っています。

import io.reactivex.Flowable; 
import io.vertx.core.http.HttpMethod; 
import io.vertx.reactivex.core.Vertx; 
import io.vertx.reactivex.core.buffer.Buffer; 
import io.vertx.reactivex.core.http.HttpClient; 
import io.vertx.reactivex.core.http.HttpClientRequest; 
import io.vertx.reactivex.core.http.HttpClientResponse; 
import org.junit.Test; 

public class StackOverflow { 

    @Test public void test(){ 

     Buffer jsonData = Buffer.buffer("..."); // the json data. 

     HttpClient client = Vertx.vertx().createHttpClient(); // the vertx client. 

     request1(client) 
      .flatMap(httpClientResponse -> httpClientResponse.toFlowable()) 
      .map(buffer -> buffer.toString()) 
      .flatMap(postData -> request2(client, postData)) 
      .forEach(httpResponse -> { 
       // do something with returned data); 
      }); 

    } 

    private Flowable<HttpClientResponse> request1(HttpClient client) { 
     HttpClientRequest request = client.request(HttpMethod.POST,"someURL"); 
     return request 
       .toFlowable() 
       .doOnSubscribe(subscription -> request.end()); 
    } 

    private Flowable<HttpClientResponse> request2(HttpClient client, String postData) { 
     HttpClientRequest request = client.request(HttpMethod.POST,"someURL"); 
     // do something with postData 
     return request 
       .toFlowable() 
       .doOnSubscribe(subscription -> request.end()); 
    } 

} 
2

request.end()に電話をかけるには複数の方法があります。しかし、Vert.xのドキュメンテーションを掘り下げたり、ソースコードがあればそれを開いて、end()を呼び出すかどうかを調べることにします。そうでない場合は、

final HttpClientRequest request = ... 
request.toObservable() 
     .doOnUnsubscribe(new Action0() { 
      @Override 
      public void call() { 
       request.end(); 
      } 
     }); 
関連する問題