2011-02-06 7 views
7
 var a = Observable.Range(0, 10); 
     var b = Observable.Range(5, 10); 
     var zip = a.Zip(b, (x, y) => x + "-" + y); 
     zip.Subscribe(Console.WriteLine); 

プリント
0から5
1 - 6
2から5
6 - - 6 ...複数のIObservableシーケンスを結合するには?

その代わりに、私は同じ値
5への参加を希望 7 - 7
8 - 8
...

これは、順序付き非同期シーケンスの100をマージする問題の単純化された例です。 2つのIEnumerableを結合するのはとても簡単ですが、Rxでこれを行う方法を見つけることができませんでした。何か案は?

入力と私が達成しようとしていることの詳細。基本的に、システム全体は、fork-joinパターンで接続された複数のステートマシン(アグリゲータ、バッファ、スムージングフィルタなど)を持つリアルタイムパイプラインです。 RXはそのようなものを実装するのに適していますか? 各入力は、データの各入力ビットは、したがって、すべてのイベントが自然に接合鍵(タイムスタンプ)によって順序付けされ、到着時にタイムスタンプされ

public struct DataPoint 
{ 
    public double Value; 
    public DateTimeOffset Timestamp; 
} 

として表すことができます。イベントがパイプラインを通って移動すると、彼らは分岐して参加します。結合は、タイムスタンプによって相関され、事前定義された順序で適用される必要があります。たとえば、join(a、b、c、d)=> join(join(join(a、b)、c)、d)を実行します。

以下は私が急いで思いつくことができるものです。うまくいけば、既存のRxオペレータに基づくより簡単なソリューションがあります。

static void Test() 
    { 
     var a = Observable.Range(0, 10); 
     var b = Observable.Range(5, 10); 
     //var zip = a.Zip(b, (x, y) => x + "-" + y); 
     //zip.Subscribe(Console.WriteLine); 

     var joined = MergeJoin(a,b, (x,y) => x + "-" + y); 
     joined.Subscribe(Console.WriteLine); 
    } 

static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector) 
    { 
     return Observable.CreateWithDisposable<string>(o => 
      { 
       Queue<int> a = new Queue<int>(); 
       Queue<int> b = new Queue<int>(); 
       object gate = new object(); 

       left.Subscribe(x => 
        { 
         lock (gate) 
         { 
          if (a.Count == 0 || a.Peek() < x) 
           a.Enqueue(x); 

          while (a.Count != 0 && b.Count != 0) 
          { 
           if (a.Peek() == b.Peek()) 
           { 
            o.OnNext(selector(a.Dequeue(), b.Dequeue())); 
           } 
           else if (a.Peek() < b.Peek()) 
           { 
            a.Dequeue(); 
           } 
           else 
           { 
            b.Dequeue(); 
           } 
          } 
         } 
        }); 

       right.Subscribe(x => 
       { 
        lock (gate) 
        { 
         if (b.Count == 0 || b.Peek() < x) 
          b.Enqueue(x); 

         while (a.Count != 0 && b.Count != 0) 
         { 
          if (a.Peek() == b.Peek()) 
          { 
           o.OnNext(selector(a.Dequeue(), b.Dequeue())); 
          } 
          else if (a.Peek() < b.Peek()) 
          { 
           a.Dequeue(); 
          } 
          else 
          { 
           b.Dequeue(); 
          } 
         } 
        } 
       }); 

       return Disposable.Empty; 
      }); 
+0

は、[RXフォーラム](http://social.msdn.microsoft.com/Forums/en-US/rx/thread/adbcd963-に同じ質問をし0c83-4968-a1b2-1317d5e31ae5) –

答えて

1

私は正直に考えることはできません未知の順序のホットソース(すなわち、xs before ysys before xs)に対応する既存の演算子に基づくソリューションの(それが動作するかどうか、ちょっと)あなたのソリューションは罰金だが、それは私のコードだった場合、私はいくつかの変更を加えたい:

  • サポートキャンセルが適切にセレクタからスローされた例外のためにMutableDisposableCompositeDisposable
  • コールOnErrorを使用して
  • 1つのソースは、他の以下のコードは、あなたのデュアルレンジ入力でテストされてい

、sの前に完了することが可能かどう完了を支える考えてみましょう(他の事業者とそれがより一貫します) AME入力はEmpty<int> + Never<int>で反転し、同様に:

public static IObservable<string> MergeJoin(
    IObservable<int> left, IObservable<int> right, Func<int, int, string> selector) 
{ 
    return Observable.CreateWithDisposable<string>(o => 
    { 
     Queue<int> a = new Queue<int>(); 
     Queue<int> b = new Queue<int>(); 
     object gate = new object(); 

     bool leftComplete = false; 
     bool rightComplete = false; 

     MutableDisposable leftSubscription = new MutableDisposable(); 
     MutableDisposable rightSubscription = new MutableDisposable(); 

     Action tryDequeue =() => 
     { 
      lock (gate) 
      { 
       while (a.Count != 0 && b.Count != 0) 
       { 
        if (a.Peek() == b.Peek()) 
        { 
         string value = null; 

         try 
         { 
          value = selector(a.Dequeue(), b.Dequeue()); 
         } 
         catch (Exception ex) 
         { 
          o.OnError(ex); 
          return; 
         } 

         o.OnNext(value); 
        } 
        else if (a.Peek() < b.Peek()) 
        { 
         a.Dequeue(); 
        } 
        else 
        { 
         b.Dequeue(); 
        } 
       } 
      } 
     }; 

     leftSubscription.Disposable = left.Subscribe(x => 
     { 
      lock (gate) 
      { 
       if (a.Count == 0 || a.Peek() < x) 
        a.Enqueue(x); 

       tryDequeue(); 

       if (rightComplete && b.Count == 0) 
       { 
        o.OnCompleted(); 
       } 
      } 
     },() => 
     { 
      leftComplete = true; 

      if (a.Count == 0 || rightComplete) 
      { 
       o.OnCompleted(); 
      } 
     }); 

     rightSubscription.Disposable = right.Subscribe(x => 
     { 
      lock (gate) 
      { 
       if (b.Count == 0 || b.Peek() < x) 
        b.Enqueue(x); 

       tryDequeue(); 

       if (rightComplete && b.Count == 0) 
       { 
        o.OnCompleted(); 
       } 
      } 
     },() => 
     { 
      rightComplete = true; 

      if (b.Count == 0 || leftComplete) 
      { 
       o.OnCompleted(); 
      } 
     }); 

     return new CompositeDisposable(leftSubscription, rightSubscription); 
    }); 
} 
3

GroupByが必要な場合があります。アイテムが「参加」するときに時間的制約がないように見えますが、同様のアイテムがいくつかの方法で一緒になっている必要があります。参加したストリームの数百を持っているあなたのREQは、問題を提示しないように

Observable.Merge(Observable.Range(1, 10), Observable.Range(5, 15)) 
.GroupBy(k => k) 
.Subscribe(go => go.Count().Where(cnt => cnt > 1) 
          .Subscribe(cnt => 
        Console.WriteLine("Key {0} has {1} matches", go.Key, cnt))); 

上記について注意するために、2つの事は、マージは、以下のオーバーロードがあります。

Merge<TSource>(params IObservable<TSource>[] sources); 
Merge<TSource>(this IEnumerable<IObservable<TSource>> sources); 
Merge<TSource>(this IObservable<IObservable<TSource>> source); 

さらに、GroupBy戻りますIObservable<IGroupedObservable<TKey, TSource>>これは、あなたが各グループに反応することができ、各グループの新しいメンバーが入ってくるたびに反応することを意味します。完了するまで待つ必要はありません。

+0

唯一の問題は、値を順番に結合できる必要があることです。しかし、intの代わりにインデックス値のタプルを渡すと解決できます。 –

+0

「順番に」とはどういう意味ですか? –

+0

'Merge' +' Count'を使うことで、両方のソースシーケンスが終了するまでマッチすることはありません。これは 'Range'の例では問題ありませんが、もしあなたのソースがhot/unendingであれば、出力は期待通りではないかもしれません。 –

1

v.2838で新しい結合演算子を使用することはどうですか?

var a = Observable.Range(1, 10); 
var b = Observable.Range(5, 10); 

var joinedStream = a.Join(b, _ => Observable.Never<Unit>(), _ => Observable.Never<Unit>(), 
    (aOutput, bOutput) => new Tuple<int, int>(aOutput, bOutput)) 
    .Where(tupple => tupple.Item1 == tupple.Item2); 

joinedStream.Subscribe(output => Trace.WriteLine(output)); 

これはJoinでの私の第一印象であり、私はこのようなNever演算子を使用するのが賢明だろうかはわかりません。膨大な量の入力を処理するときに大量の入力を処理すると、より多くの入力が再生されました。私は、マチェットが作られたときにウィンドウを閉じ、ソリューションをより効率的にするための作業ができると思います。上記の例は、あなたの質問に従って動作すると言われています。

私はスコットの答えはおそらくこのインスタンスに行く方法だと思います。私はこれを潜在的な代替案として投げかけています。

+0

+1参加する。私は昨日1時間を過ごし、それを働かせることができませんでした。私はあなたのパフォーマンスに関する懸念をお伝えします。また、生成されるコードは、単純なLINQ結合と比べてはるかに不明瞭で、追跡が困難です。私はRxがこの種の問題の良い解決策ではないと考え始めています。 –

+0

@Serger - これは、一致が行われたとき(つまり、Observable.Neverをもう少しインテリジェントなものに置き換える)の期間値を発行することで効率が向上すると確信しています。すべてのルールは、あなたがその期間を完了するのが安全なときのためのルールに依存します。 –

2

この答えは、それは同様にここにアーカイブされるだけのように、Rx forumsからコピーされます。

var xs = Observable.Range(1, 10); 
var ys = Observable.Range(5, 10); 

var joined = from x in xs 
    from y in ys 
    where x == y 
    select x + "-" + y; 

またはクエリ式を使用せず:

var joined = 
    xs.SelectMany(x => ys, (x, y) => new {x, y}) 
    .Where(t => t.x == t.y) 
    .Select(t => t.x + "-" + t.y); 
+2

この解決策の唯一の問題は、 'ys'がホット(または' Multicast')であることを要求し、 'ys'値が' xs'値の前に現れるシナリオをサポートしていないことです。 –