2011-01-11 9 views
4

イベント処理を実装する必要があります。これは、一定期間新しいイベントが到着しなかった場合に遅延します。 (テキストバッファが変更されたときに解析タスクをキューに入れなければならないが、ユーザーがまだ入力しているときに解析を開始したくない。)RXのタイムアウトでバッファリングを実装する方法

私はRXで新しくなっているが、 BufferWithTimeメソッドとTimeoutメソッドを組み合わせる必要があります。私はこれが次のように機能していると想像しています。イベントは、後続のイベントの間に指定された時間内に定期的に受信されるまでバッファリングされます。イベント・フローに時間間隔よりも長い時間間隔がある場合、それまでにバッファリングされたイベントを伝播して戻す必要があります。

BufferとTimeoutがどのように実装されているか見てみると、BufferWithTimeoutメソッドを実装することができます(誰もが持っていれば分かち合う)が、既存のメソッドを組み合わせるだけで実現できるのだろうか。何か案は?

答えて

3

私は BufferWithTimeがあなたの後になっていると思います。

そこに組み込まれて何もありませんが、このような何かが動作するはずです:

注:エラーソースから発生した場合、バッファがフラッシュされません。これは、あなただけのユーザーが一定時間入力して停止したときの動作を実行する必要があり、必ずしも中間を必要としない場合BufferWith*

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout) 
{ 
    return source.BufferWithTimeout(timeout, Scheduler.TaskPool); 
} 

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler) 
{ 
    return Observable.CreateWithDisposable<TSource[]>(observer => 
    { 
     object lockObject = new object(); 
     List<TSource> buffer = new List<TSource>(); 

     MutableDisposable timeoutDisposable = new MutableDisposable(); 

     Action flushBuffer =() => 
     { 
      TSource[] values; 

      lock(lockObject) 
      { 
       values = buffer.ToArray(); 
       buffer.Clear(); 
      } 

      observer.OnNext(values); 
     }; 

     var sourceSubscription = source.Subscribe(
      value => 
      { 
       lock(lockObject) 
       { 
        buffer.Add(value); 
       } 

       timeoutDisposable.Disposable = 
        scheduler.Schedule(flushBuffer, timeout); 
      }, 
      observer.OnError, 
      () => 
      { 
       flushBuffer(); 
       observer.OnCompleted(); 
      }); 

     return new CompositeDisposable(sourceSubscription, timeoutDisposable); 
    }); 
} 
+0

BufferWithTimeは定期的にトリガをトリガしますが、特定のアイドル時間が経過した後にトリガしたいと考えています。 –

+0

これは、ギャップがない場合、決して出力を受け取らないことを意味しますか? –

+0

はい、ギャップがない場合は、イベントを受信したくありません。しかし、イベントはユーザーのキー入力によって生成され、タイムアウトは100msですので、遅かれ早かれギャップがあります:) –

2

の現在の(または現在の最後に私がチェックした時)の機能と一致しましたイベントの場合は、Throttleが後の操作です。そのシナリオでの使用例については、hereを参照してください。

+0

私の場合は、最終的な結果だけでなく、大きなテキストブロックでも、変更された部分のみを再解析するだけでなく、個々の変更がそうでない場合は正しいと思います。興味深いことに、スロットルは良い選択です。 –

+0

私のアプリケーションでは、いくつかの処理をしたいユーザーが3Dビューでカメラの移動を停止したときに表示されます。スロットルはこのための良い選択でした。 – TortoiseTNT

2

Richard Szalayの回答に加えて、最新のrxリリースの新しいWindowオペレータを探しています。それはあなたの問題を解決します。つまり、タイムアウトでバッファすることができます。つまり、タイムアウトに達するまでの時間枠内で出力を取得しますが、IEnumerableとして結果を受け取る代わりに実際に取得しますIObservableとして。ここで

は、私が何を意味するかの簡単な例です:

private void SetupStream() 
{ 
    var inputStream = Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>(
     h => new MouseButtonEventHandler(h), 
     h => MouseDown += h, 
     h => MouseDown -= h); 

    var timeout = inputStream.Select(evt => Observable.Timer(TimeSpan.FromSeconds(10), Scheduler.Dispatcher)) 
     .Switch(); 

    inputStream.Window(() => timeout) 
     .Subscribe(OnWindowOpen); 
} 


private void OnWindowOpen(IObservable<IEvent<MouseButtonEventArgs>> window) 
{ 
    Trace.WriteLine(string.Format("Window open")); 

    var buffer = new List<IEvent<MouseButtonEventArgs>>(); 

    window.Subscribe(click => 
    { 

     Trace.WriteLine(string.Format("Click")); 

     buffer.Add(click); 

    },() => ProcessEvents(buffer)); 
} 

private void ProcessEvents(IEnumerable<IEvent<MouseButtonEventArgs>> clicks) 
{ 
    Trace.WriteLine(string.Format("Window closed")); 

    //... 
} 

ウィンドウが開くたびに、あなたは、として、すべてのイベントを受け取り、彼らが来たときに、ウィンドウが完了したときに、(バッファおよびプロセスに保存します次のウィンドウが開くと実際に発生します)。

リチャードがWindowを使用するように彼の例を変更するかどうかはわかりませんが、これは利用可能ですが、代わりとして価値があると考えられました。

+0

興味深い。私は最新のリリースを見ることを意味してきましたが、今は言い訳があります! –

3

私は "として、以下を提供します。これはかなり古い質問ですが、私はなど

、変更を追跡し、他のすべてのソリューションは、手動で購読するユーザーを余儀なくされていますので、以下の答えは、言及する価値があると信じていますRx-y "溶液である。

var buffers = source 
    .GroupByUntil(
     // yes. yes. all items belong to the same group. 
     x => true, 
     g => Observable.Amb<int>(
       // close the group after 5 seconds of inactivity 
       g.Throttle(TimeSpan.FromSeconds(5)), 
       // close the group after 10 items 
       g.Skip(9) 
      )) 
    // Turn those groups into buffers 
    .SelectMany(x => x.ToArray()); 

基本的に、ソースは、最新のウィンドウの観点から観察可能なものまで定義されています。新しいウィンドウ(グループ化されたobservable)が作成され、そのウィンドウを使用してウィンドウをいつ終了するかを決定します。この場合、5秒間使用しなかった場合、または最大長が10(9 + 1)の場合、ウィンドウを閉じます。

関連する問題