2011-12-21 5 views
1

私は観察可能なデータを散発的に返します。 1分間のデータがない場合、データを再度生成するまで毎分の最後のデータを繰り返す必要があります。どうすればこれを達成できますか?観測データがアイドル状態であることを検出して毎分データを注入するにはどうすればよいですか?

ありがとうございます。

+1

は、アイドル時間のチェックと最後に公開値を格納するフィールド用のタイマーを使用してください。 – Maheep

答えて

1

私は1分に少なくとも1回値を返すことが保証されているオブザーバブルにオブザーバブルをラップします。ラッパーは、ラップされたオブザーバブルが値を返すたびに再起動されるタイマーを実行することによって、これを行うことができます。

したがって、ラップされたオブザーバブルがデータを返すたびに、または最後のイベントから1分後にラッパーがデータを返します。

アプリケーションの残りの部分は、ラッパーを見るだけで便利です。

0

(あなたのソースがマルチスレッド化されている場合には、非スレッドセーフ)ここでRepeatAfterTimeoutオペレータの実装:私は私が持っていた

// Repeats the last value emitted after a timeout 
public static IObservable<TSource> RepeatAfterTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler) 
{ 
    return Observable.CreateWithDisposable<TSource>(observer => 
    { 
     var timer = new MutableDisposable(); 
     var subscription = new MutableDisposable(); 

     bool hasValue = false; 
     TSource lastValue = default(TSource); 

     timer.Disposable = scheduler.Schedule(recurse => 
     { 
      if (hasValue) 
      { 
       observer.OnNext(lastValue); 
      } 

      recurse(); 
     }); 

     subscription.Disposable = source 
      .Do(value => { lastValue = value; hasValue = true; }) 
      .Subscribe(observer); 

     return new CompositeDisposable(timer, subscription); 
    }); 
} 

public static IObservable<TSource> RepeatAfterTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout) 
{ 
    return source.RepeatAfterTimeout(timeout, Scheduler.TaskPool); 
} 
+0

これは要件を満たしているとは思わない(たとえ実際にどこかでタイムアウトを使用したとしても)。ソースからのアクティビティに関係なく、タイマーを介して毎回値を出力しています。また、古いバージョンまたはRxを使用しています。 –

0

を期待通りにタイムアウトが機能しなかったとして更新

EDIT一度まったく同じ要件。最初のタイムアウトの前に値がトリガーされない場合に使用するデフォルト値を含めることを選択しました。ここでは、C#のバージョンがあります:

public static IObservable<T> 
AtLeastEvery<T>(this IObservable<T> source, TimeSpan timeout, 
       T defaultValue, IScheduler scheduler) 
{ 
    if (source == null) throw new ArgumentNullException("source"); 
    if (scheduler == null) throw new ArgumentNullException("scheduler"); 
    return Observable.Create<T>(obs => 
     { 
      ulong id = 0; 
      var gate = new Object(); 
      var timer = new SerialDisposable(); 
      T lastValue = defaultValue; 

      Action createTimer =() => 
       { 
        ulong startId = id; 
        timer.Disposable = scheduler.Schedule(timeout, 
          self => 
          { 
           bool noChange; 
           lock (gate) 
           { 
            noChange = (id == startId); 
            if (noChange) obs.OnNext(lastValue); 
           } 
           //only restart if no change, otherwise 
           //the change restarted the timeout 
           if (noChange) self(timeout); 
          }); 
       }; 
      //start the first timeout 
      createTimer(); 
      var subscription = source.Subscribe(
       v => 
       { 
        lock (gate) 
        { 
         id += 1; 
         lastValue = v; 
        } 
        obs.OnNext(v); 
        createTimer(); //reset the timeout 
       }, 
       ex => 
       { 
        lock (gate) 
        { 
         id += 1; //'cancel' timeout 
        } 
        obs.OnError(ex); 
        //do not reset the timeout, because the sequence has ended 
       }, 
       () => 
       { 
        lock (gate) 
        { 
         id += 1; //'cancel' timeout 
        } 
        obs.OnCompleted(); 
        //do not reset the timeout, because the sequence has ended 
       }); 

      return new CompositeDisposable(timer, subscription); 
     }); 
} 

あなたはこのメソッドにデフォルトの1と代表者を選ぶの過負荷を行い、スケジューラを毎回渡すためにしたくない場合。私はScheduler.ThreadPoolを使用しました。

このコードは、ソースからの新しい値が入ったときにSerialDisposableの動作を使用して前回のタイムアウト呼び出しをキャンセルします。また、タイマが既に経過している場合に使用するカウンタもあります。スケジュールからの復帰は役に立たないが、その方法はまだ実際には実行されていない。

私はタイムアウトを変更する可能性を調査しましたが、私が取り組んでいた問題に対してはタイムアウトを必要としませんでした。私はそれが可能であったが、そのコードのどれも便利ではないことを思い出します。

2

あなたが望むことをするための1つのライナーがあります。私はそれをテストしたし、正しいと思われる。

var results = source 
    .Publish(xs => 
     xs 
      .Select(x => 
       Observable 
        .Interval(TimeSpan.FromMinutes(1.0)) 
        .Select(_ => x) 
        .StartWith(x)) 
      .Switch()); 

これがトリックであるかどうかを教えてください。

0

私はこれがRxの道働く(再帰なしをまだ副作用を含む)すべきだと思う:

public static IObservable<TSource> RepeatLastValueWhenIdle<TSource>(
     this IObservable<TSource> source, 
     TimeSpan idleTime, 
     TSource defaultValue = default(TSource)) 
    { 
     TSource lastValue = defaultValue; 

     return source 
      // memorize the last value on each new 
      .Do(ev => lastValue = ev) 
      // re-publish the last value on timeout 
      .Timeout(idleTime, Observable.Return(lastValue)) 
      // restart waiting for a new value 
      .Repeat(); 
    } 
関連する問題