2016-09-03 5 views
6

ReactiveX(RxJava)には演算子timeoutがあり、サブスクリプションストリームのすべての項目に適用されます。しかし、私は最初の応答をタイムアウトでチェックするだけで、次の応答のタイムアウトは気にしません。 RxJavaの演算子でこの要件をエレガントに実装するにはどうすればよいですか?次のようにそれを行うにはRxJavaはタイムアウトのある最初の応答項目のみをチェックします

答えて

2

一つの方法は次のとおりです。

Observable<Response> respStream = respStream(); 
ConnectableObservable<Response> sharedRespStream = respStream.publish(); 

Observable<String> first = sharedRespStream.first().timeout(2, TimeUnit.SECONDS); 
Observable<String> rest = sharedRespStream.skip(1); 
Observable<String> result = first.mergeWith(rest); 

sharedRespStream.connect(); 

result.subscribe(response -> handleResponse(response), error -> handleError(error)); 

コード自己説明です:シェア応答は、重複した要求を避ける放出される最初の項目にタイムアウトを適用し、最初の1以下の項目でそれをマージします。

3

これはもっと機能的な方法です。これはScalaでありますが、Javaに転写する必要があります。

val myTimeout : Observable[Nothing] = Observable timer (10 seconds) flatMap (_ => Observable error new TimeoutException("I timed out!")) 

myStream amb myTimeout 

ambオペレータが最初に発し、観察の値を返します。

0

ベストのオプションは、すべてのアイテムにタイムアウトが発生し、サブスクリプションにも1つ(興味のあるもの)であるtimeout overloadを使用することです。私が説明しよう

observable.timeout(() -> Observable.empty() 
      .delay(10, TimeUnit.SECONDS), o -> Observable.never()) 

、最初func0は、サブスクライブ上で実行されます、そしてあなたが望む時間だけ遅延(完全放射する)、空に観察を放出します。 アイテムが到着する前に時間が過ぎれば、あなたが望むようにタイムアウトになります。 2番目のパラメータはfunc1が

別のオプションは、ルチアーノの提案、次の あるので、私達はちょうど決して通過しないために(完全にないか、何かを)あなたは何の使用を持っていない項目、間のタイムアウトを決定します、あなたはこのようにそれを行うことができます。

public static class TimeoutFirst<T> implements Transformer<T,T> { 

    private final long timeout; 
    private final TimeUnit unit; 

    private TimeoutFirst(long timeout, TimeUnit unit) { 
     this.timeout = timeout; 
     this.unit = unit; 
    } 

    @Override 
    public Observable<T> call(Observable<T> observable) { 
     return Observable.amb(observable, 
       Observable.timer(timeout, unit).flatMap(aLong -> Observable.error(new TimeoutException("Timeout after " + timeout + " " + unit.name())))); 
    } 
} 

public static <T> Transformer<T, T> timeoutFirst(long timeout, TimeUnit seconds) { 
    return new TimeoutFirst<>(timeout, seconds); 
} 

これはambを使用したかなりすっきりしたソリューションです。

関連する問題