2011-02-07 7 views
0

私はRXに慣れていません。希望のシナリオはうまくいきますが、これを達成するためにはもっとシンプルで洗練された方法が必要です。私が持っているのはIObservable<T>で、私はIObservable<U>,で終わるようにそれを購読したいと思っています。それは、見ているTごとにUを生成する非同期操作を引き起こします。IObservableを別のIObservableに非同期で依存させる簡単な方法はありますか?

私がこれまでに(それは素晴らしい動作しますが、面倒なようです)このような中間イベントストリームを使用して行く持っているもの:

public class Converter { 
    public event EventHandler<UArgs> UDone; 
    public IConnectableObservable<U> ToUs(IObservable<T> ts) { 
    var us = Observable.FromEvent<UArgs>(this, "UDone").Select(e => e.EventArgs.U).Replay(); 
    ts.Subscribe(t => Observable.Start(() => OnUDone(new U(t)))); 
    return us; 
    } 
    private void OnUDone(U u) { 
    var uDone = UDone; 
    if (uDone != null) { 
     uDone(this, u); 
    } 
    } 
} 

... 

var c = new Converter(); 
IConnectableObservable<T> ts = ...; 
var us = c.ToUs(ts); 
us.Connect(); 

... 

を、私は私が行うにははるかに簡単な方法が欠けていると確信していますこの...

+0

ここに「リプレイ」をしてもよろしいですか? –

答えて

1

SelectManyこれはまさにあるIO<IO<T>>

Observable.Range(1, 10) 
     .Select(ii => Observable.Start(() => 
      string.Format("{0} {1}", ii, Thread.CurrentThread.ManagedThreadId))) 
     .SelectMany(id=>id) 
     .Subscribe(Console.WriteLine); 
+0

ありがとう、スコット。私はSelectManyについて知っていましたが、何らかの理由で(単一項目の)オブザーバブルの観測可能性を平坦化するという点でこの問題を考慮していませんでした。 – lesscode

+0

しかし私はこれに問題があることに気付きました。OutOfMemoryExceptionでイベントストリームの途中でプロセスを爆破します。私のアプローチはそれをしなかった... – lesscode

0

を平らに、あなたが必要とする何をすべきSelectMany以下のためである:

IObservable<int> ts 

IObservable<string> us = ts.SelectMany(t => StartAsync(t)); 

us.Subscribe(u => 
    Console.WriteLine("StartAsync completed with {0}", u)); 

... 

private IObservable<string> StartAsync(int t) 
{ 
    return Observable.Return(t.ToString()) 
     .Delay(TimeSpan.FromSeconds(1)); 
} 

StartAsyncが可変の完了時間を持っている場合、あなたが入力値とは異なる順序で出力値を受け取ることができることに留意してください。

関連する問題