var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
var zip = a.Zip(b, (x, y) => x + "-" + y);
zip.Subscribe(Console.WriteLine);
プリント
0から5
1 - 6
2から5
6 - - 6 ...複数のIObservableシーケンスを結合するには?
その代わりに、私は同じ値
5への参加を希望 7 - 7
8 - 8
...
これは、順序付き非同期シーケンスの100をマージする問題の単純化された例です。 2つのIEnumerableを結合するのはとても簡単ですが、Rxでこれを行う方法を見つけることができませんでした。何か案は?
入力と私が達成しようとしていることの詳細。基本的に、システム全体は、fork-joinパターンで接続された複数のステートマシン(アグリゲータ、バッファ、スムージングフィルタなど)を持つリアルタイムパイプラインです。 RXはそのようなものを実装するのに適していますか? 各入力は、データの各入力ビットは、したがって、すべてのイベントが自然に接合鍵(タイムスタンプ)によって順序付けされ、到着時にタイムスタンプされ
public struct DataPoint
{
public double Value;
public DateTimeOffset Timestamp;
}
として表すことができます。イベントがパイプラインを通って移動すると、彼らは分岐して参加します。結合は、タイムスタンプによって相関され、事前定義された順序で適用される必要があります。たとえば、join(a、b、c、d)=> join(join(join(a、b)、c)、d)を実行します。
以下は私が急いで思いつくことができるものです。うまくいけば、既存のRxオペレータに基づくより簡単なソリューションがあります。
static void Test()
{
var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
//var zip = a.Zip(b, (x, y) => x + "-" + y);
//zip.Subscribe(Console.WriteLine);
var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
joined.Subscribe(Console.WriteLine);
}
static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
return Observable.CreateWithDisposable<string>(o =>
{
Queue<int> a = new Queue<int>();
Queue<int> b = new Queue<int>();
object gate = new object();
left.Subscribe(x =>
{
lock (gate)
{
if (a.Count == 0 || a.Peek() < x)
a.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
right.Subscribe(x =>
{
lock (gate)
{
if (b.Count == 0 || b.Peek() < x)
b.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
return Disposable.Empty;
});
は、[RXフォーラム](http://social.msdn.microsoft.com/Forums/en-US/rx/thread/adbcd963-に同じ質問をし0c83-4968-a1b2-1317d5e31ae5) –