2012-11-19 6 views
9

報告目的で時間ベースのイベントを集計するために使用されるC#(.NET 4.5)アプリケーションを作成しています。私のクエリロジックをリアルタイムデータと履歴データの両方で再利用できるようにするには、Reactive Extensions(2.0)とインフラストラクチャISchedulerHistoricalSchedulerとフレンド)を使用します。例えばObservable.Generate()がSystem.StackOverflowExceptionをスローするのはなぜですか?

、我々は(年代順にソートされたが、彼らは一致していてもよい!)、その唯一のペイロードそのタイムスタンプイストや固定期間のバッファ間でその分布を知りたいイベントのリストを作成すると仮定します

const int num = 100000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 

var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

var stream = Observable.Generate<int, DateTimeOffset>(
    0, 
    s => s < events.Count, 
    s => s + 1, 
    s => events[s], 
    s => events[s], 
    time); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 

次のスタックトレース(すべての方法ダウンし最後の3行をit's)とSystem.StackOverflowExceptionにこのコードの結果を実行:

mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 

[OK]を、問題がObservable.Generate()の私の使用から来ているようだ、リストに応じて、サイズ(num)、スケジューラの選択にかかわらず。

私は間違っていますか?または、より一般的には、IEnumerableのイベントから独自のタイムスタンプを提供するIObservableを作成するにはどうすればよいでしょうか?

+1

このエラーが発生する前に、どのくらいの大きさの数値を入力できますか?また、デバッガでこれをシングルステップ実行すると、エラーが表示される前に実行される最後のコード行は何ですか? –

+0

私にとっては、クリティカルなしきい値は〜num = 51600であるようです(リリース構成では、デバッグ設定で少し下回ります)。観測可能なシーケンスは完全に作成されたようです。私は 'Observable.Generate()'のlamdba式でブレークポイントを打つことができます。例外は 'Console.WriteLine()'の最後の呼び出しの後にスローされます。 –

+1

これは単なる推測ですが、ストリームが各要素を破棄しようとしていて、各要素がストリームを破棄しようとしているように見えます。基本的に再帰的に 'Cancel'や' Dispose'を呼び出し、スタックを吹き飛ばします(デフォルトサイズは1メガバイトです)。私はなぜこれが起こっているのかを知るために「Observable」と十分に精通していない。 –

答えて

3

は(アップデート - 私は選択肢を提供していないことに気付きました。で見ます答えの末尾)

問題は、Observable.Generateがどのように動作するかにあります。これは、引数に基づいてcorecursive(再帰が内側になったと思う)ジェネレータを展開するために使用されます。それらの議論が非常に入れ子corecursiveジェネレータを生成した場合、あなたはあなたのスタックを爆破するでしょう。

私は多くのことを思っています(私の目の前にはRxソースはありません) (下記参照)、私はあなたの定義が:

initial_state => 
generate_next(initial_state) => 
generate_next(generate_next(initial_state)) => 
generate_next(generate_next(generate_next(initial_state))) => 
generate_next(generate_next(generate_next(generate_next(initial_state)))) => ... 

コールスタックがオーバーフローするほど大きくなるまで、オンとオフを繰り返します。例えばシグネチャ+あなたのintカウンタでは、再帰呼び出しあたり8-16バイト(ステートマシンジェネレータの実装方法によります)のようなものでしょうから、右に約60,000音(1M/16〜62500最大深さ)

EDIT:ソースをプルアップ - 確認:このようなルックスを生成する「ファイル名を指定して実行」の方法を - Generateにネストされた呼び出しに注意してください:

protected override IDisposable Run(
    IObserver<TResult> observer, 
    IDisposable cancel, 
    Action<IDisposable> setSink) 
{ 
    if (this._timeSelectorA != null) 
    { 
     Generate<TState, TResult>.α α = 
       new Generate<TState, TResult>.α(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(α); 
     return α.Run(); 
    } 
    if (this._timeSelectorR != null) 
    { 
     Generate<TState, TResult>.δ δ = 
       new Generate<TState, TResult>.δ(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(δ); 
     return δ.Run(); 
    } 
    Generate<TState, TResult>._ _ = 
      new Generate<TState, TResult>._(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
    setSink(_); 
    return _.Run(); 
} 

EDIT:DERP、提供していませんでした任意の選択肢...ここではうまくいくかもしれません:

(編集:固定Enumerable.Range、ストリームサイズはchunkSize

const int num = 160000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 
var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

    // Size too big? Fine, we'll chunk it up! 
const int chunkSize = 10000; 
var numberOfChunks = events.Count/chunkSize; 

    // Generate a whole mess of streams based on start/end indices 
var streams = 
    from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count/chunkSize) - 1) 
    let startIdx = chunkIndex * chunkSize 
    let endIdx = Math.Min(events.Count, startIdx + chunkSize) 
    select Observable.Generate<int, DateTimeOffset>(
     startIdx, 
     s => s < endIdx, 
     s => s + 1, 
     s => events[s], 
     s => events[s], 
     time); 

    // E pluribus streamum 
var stream = Observable.Concat(streams); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 
+0

ありがとう、それは完璧です!私自身の回避策よりも効率的であるようです。私はあなたの算術で小さな​​誤りを修正しなければなりませんでした(編集を参照)。 RX内で再帰的な実装がなぜ必要なのかはまだ分かりません。結局のところ、それはRX v1.0(60,000をはるかに超える)で動作するようです。それでも、良い調査、巧妙な解決策。再度、感謝します! –

+0

問題ありません! Heh - 私は実際には1つだけの数学的なエラーがあったことに感心しています...;) – JerKimball

3

OK、私は状態遷移としてlamdba式を必要としない別のファクトリメソッドを使用しましたが、今はスタックオーバーフローがもう発生しません。まだこれは私の質問に対する正しい答えとしての資格かどう確認 - 私が、それは動作し、私はI'dはここでそれを共有して考えた:

var stream = Observable.Create<DateTimeOffset>(o => 
    { 
     foreach (var e in events) 
     { 
      time.Schedule(e,() => o.OnNext(e)); 
     } 

     time.Schedule(events[events.Count - 1],() => o.OnCompleted()); 

     return Disposable.Empty; 
    }); 

手動で前にイベントをスケジュールするサブスクリプションらしいを返します(!)私にとっては厄介ですが、この場合はラムダ式の中で行うことができます。

この方法について何か問題がある場合は、修正してください。また、私は元のコードに違反していたSystem.Reactiveの暗黙の前提を聞いて喜んでいます。

(!私のああ、私は以前のことを確認しておく必要があります。RX v1.0をして、オリジナルのObservable.Generate()が実際に動作するようには思えない)

関連する問題