2011-09-20 5 views
2

私がしたい:別のIObservableが公開されるまで、IObservableにサブスクライブしてデータをバッファリングする方法は?

  1. はすぐにIObservable<T>に加入しますが、すぐに(すなわち、まだ私のIObserver<T>からは見えない)が受信される任意のTをバッファリングを開始します。
  2. いくつか作業してください。
  3. 作業が完了すると、私のIObserver<T>にバッファをフラッシュし、

を続けることは、サブスクリプションが発生した最初のものであることは極めて重要です。 T + 1のそれに私自身がIObservable<int>s1に依存していることIObservable<bool>rに加入し、私はこのような何かの後だ「大理石の図」形式で

...

Time     T+1 2 3 4 5 6 7 8 
s1:IObservable<int>  1 2 3 4 5 6 7 8 
s2:IObservable<bool>   t  
r: IObservable<int>   1 3 4 5 6 7 8 
           2 

...とIObservable<bool>s2s1は私がコントロールしていないストリームです。s2は私がコントロールするものです(件名)。作業が終わったらpublishがオンになります。

私はSkipUntilが私を助けてくれると思っていましたが、従属する前に受け取ったイベントをバッファリングしません。IObservableが完了しました。

私はうまくいくと思っていたコードですが、SkipUntilはバッファではないためです。

 var are = new AutoResetEvent(false); 
     var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1)); 

     events.Subscribe(x => Console.WriteLine("events:" + x),() => are.Set()); 

     var subject = new Subject<int>(); 
     var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5)); 

     Console.WriteLine("Subscribing to events..."); 

     events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x)); 
     Console.WriteLine("Subscribed."); 

     completed.Subscribe(x => Console.WriteLine("Completed")); 

     subject.OnNext(10); 

     are.WaitOne(); 
     Console.WriteLine("Done"); 

私は様々なBuffer方法について知っているが、私は本当に私のサブスクリプションの開始時に活動を調整し、ここではバッファリングしていないですと、彼らは、この場合には、適切ないないようです。これは、私の作品

public static class ObservableEx 
{ 
    public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed) 
    { 
     var observable = Observable.Create<TSource>(o => 
     { 
      var replaySubject = new ReplaySubject<TSource>(); 
      var sub1 = source.Subscribe(replaySubject); 
      var query = 
       completed.Take(1).Select(
        x => replaySubject.AsObservable()); 
      var sub2 = query.Switch().Subscribe(o); 
      return new CompositeDisposable(sub1, sub2); 
     }); 
     return observable; 
    }   
} 

答えて

4

::私は役に立つかもしれない次の拡張メソッドにEnigmativityの応答を一般化している

UPDATE

var r = Observable.Create<int>(o => 
{ 
    var rs = new ReplaySubject<int>(); 
    var subscription1 = s1.Subscribe(rs); 
    var query = from f in s2.Take(1) select rs.AsObservable(); 
    var subscription2 = query.Switch().Subscribe(o); 
    return new CompositeDisposable(subscription1, subscription2); 
}); 
+0

ブリリアント、素晴らしい作品!今すぐそれを理解する;-) –

関連する問題