2013-03-07 12 views
6

私はrxと特定のクエリで立ち往生しています。 問題:条件が変わるまでRx groupby

多くの単一更新操作は連続ストリームによって生成されます。操作は挿入または削除できます。私はそれらのストリームをバッファリングし、当時はほとんど操作を実行しませんが、順序を保持することは本当に重要です。また、操作のシーケンスにバッファリングして行われるべきであるX秒ごと

例:では

insert-insert-insert-delete-delete-insert-delete-delete-delete-delete 

アウト:

insert(3)-delete(2)-insert(1)-delete(4) 

は、私がテストに簡単なアプリケーションを書きましたそれは多かれ少なかれ私のように動作しますが、受信した挿入/削除の順序を尊重しません

namespace RxTests 
{ 
using System; 
using System.Collections.Generic; 
using System.Globalization; 
using System.Linq; 
using System.Reactive.Concurrency; 
using System.Reactive.Linq; 
using System.Reactive.Subjects; 
using System.Text; 
using System.Threading; 

internal class Program 
{ 
    private static readonly Random Random = new Random(); 

    private static readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource(); 

    private static readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>(); 

    private static void Main(string[] args) 
    { 
     Console.WriteLine("Starting production"); 
     var producerScheduler = new EventLoopScheduler(); 
     var consumerScheduler = new EventLoopScheduler(); 
     var producer = 
      Observable.Interval(TimeSpan.FromSeconds(2)) 
         .SubscribeOn(producerScheduler) 
         .Subscribe(Produce, WriteProductionCompleted); 
     var consumer = 
      operations.ObserveOn(producerScheduler) 
         .GroupBy(operation => operation.Delete) 
         .SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8), 50)) 
         .SubscribeOn(consumerScheduler) 
         .Subscribe(WriteUpdateOperations); 
     Console.WriteLine("Type any key to stop"); 
     Console.ReadKey(); 
     consumer.Dispose(); 
     producer.Dispose(); 
    } 

    private static void Produce(long time) 
    { 
     var delete = Random.NextDouble() < 0.5; 
     Console.WriteLine("Produce {0}, {1} at {2}", time + 1, delete, time); 
     var idString = (time + 1).ToString(CultureInfo.InvariantCulture); 
     var id = time + 1; 
     operations.OnNext(
      new UpdateOperation(id, delete, idString, time.ToString(CultureInfo.InvariantCulture))); 
    } 

    private static void WriteProductionCompleted() 
    { 
     Console.WriteLine("Production completed"); 
     ProducerStopped.Cancel(); 
    } 

    private static void WriteUpdateOperation(UpdateOperation updateOperation) 
    { 
     Console.WriteLine("Consuming {0}", updateOperation); 
    } 

    private static void WriteUpdateOperations(IList<UpdateOperation> updateOperation) 
    { 
     foreach (var operation in updateOperation) 
     { 
      WriteUpdateOperation(operation); 
     } 
    } 

    private class UpdateOperation 
    { 
     public UpdateOperation(long id, bool delete, params string[] changes) 
     { 
      this.Id = id; 
      this.Delete = delete; 
      this.Changes = new List<string>(changes ?? Enumerable.Empty<string>()); 
     } 

     public bool Delete { get; set; } 

     public long Id { get; private set; } 

     public IList<string> Changes { get; private set; } 

     public override string ToString() 
     { 
      var stringBuilder = new StringBuilder("{UpdateOperation "); 
      stringBuilder.AppendFormat("Id: {0}, Delete: {1}, Changes: [", this.Id, this.Delete); 
      if (this.Changes.Count > 0) 
      { 
       stringBuilder.Append(this.Changes.First()); 
       foreach (var change in this.Changes.Skip(1)) 
       { 
        stringBuilder.AppendFormat(", {0}", change); 
       } 
      } 

      stringBuilder.Append("]}"); 
      return stringBuilder.ToString(); 
     } 
    } 
} 

}

誰もが右のクエリで私を助けることができますか?

おかげ

UPDATE 08.03.13(JerKimballによる提案)

次の行は、結果を印刷するJerKimballのコードに小さな変更/追加され、次の使用

using(query.Subscribe(Print)) 
{ 
    Console.ReadLine(); 
    producer.Dispose();   
} 

印刷方法:

private static void Print(IObservable<IList<Operation>> operations) 
{ 
    operations.Subscribe(Print); 
} 

private static void Print(IList<Operation> operations) 
{ 
    var stringBuilder = new StringBuilder("["); 
    if (operations.Count > 0) 
    { 
     stringBuilder.Append(operations.First()); 
     foreach (var item in operations.Skip(1)) 
     { 
      stringBuilder.AppendFormat(", {0}", item); 
     } 
    } 

    stringBuilder.Append("]"); 
    Console.WriteLine(stringBuilder); 
} 

と操作のための文字列に、次の:私は別のサブスクリプション内の加入についてはよく分からない

  • public override string ToString() 
    { 
        return string.Format("{0}:{1}", this.Type, this.Seq); 
    } 
    

    秩序は維持されるが、それは正しい(つまり、私は以来持っている質問ですされます長い時間前に、私に決して決してなかった)?

  • 私はいつも(ストリームが同じタイプで二つ以上の連続した値を生成する場合でも)各リストには以上の2つの要素を持っていない
+0

'私は別の内の加入についてはよく分かりませんサブスクリプション:それは正しい '< - あなたはこれによって何を意味しますか?これはどこで起こっていますか?あなたが 'IDisposable'を漏らしているためである場合にのみ、あなたは' Print'でこれをしたくありません。 – JerKimball

+0

'Subscribe'は' Action ' - これを使用します。そのため私の例では 'Console.WriteLine'を使用しています。 'Print'のシグネチャを' IList 'に変更した場合は、単に' using(query.Subscribe(Print)) 'とすることができます。 – JerKimball

+0

IObservable を選択してください>最初のPrint(IObservable >操作)。だから私はもう一度2番目の操作をラップします。サブスクライブ(印刷) – fra

答えて

1

のは、新しいアプローチ(したがって、新しい答えを)試してみましょう:

まずは、順序を保持したまま、キーに基づいてアイテムのだろう「崩壊」のリスト拡張メソッドを定義してみましょう:

public static class Ext 
{ 
    public static IEnumerable<List<T>> ToRuns<T, TKey>(
      this IEnumerable<T> source, 
      Func<T, TKey> keySelector) 
    { 
     using (var enumerator = source.GetEnumerator()) 
     { 
      if (!enumerator.MoveNext()) 
       yield break; 

      var currentSet = new List<T>(); 

      // inspect the first item 
      var lastKey = keySelector(enumerator.Current); 
      currentSet.Add(enumerator.Current); 

      while (enumerator.MoveNext()) 
      { 
       var newKey = keySelector(enumerator.Current); 
       if (!Equals(newKey, lastKey)) 
       { 
        // A difference == new run; return what we've got thus far 
        yield return currentSet; 
        lastKey = newKey; 
        currentSet = new List<T>(); 
       } 
       currentSet.Add(enumerator.Current); 
      } 

      // Return the last run. 
      yield return currentSet; 

      // and clean up 
      currentSet = new List<T>(); 
      lastKey = default(TKey); 
     } 
    } 
} 

かなり簡単 - IEnumerable<T>を指定すると、List<List<T>>が返されます。ここで、各サブリストは同じキーを持ちます。

、それを養う、それを使用する:

var rnd = new Random(); 
var fakeSource = new Subject<Operation>(); 
var producer = Observable 
    .Interval(TimeSpan.FromMilliseconds(1000)) 
    .Subscribe(i => 
     { 
      var op = new Operation(); 
      op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete"; 
      fakeSource.OnNext(op); 
     });  

var singleSource = fakeSource 
    .Publish().RefCount(); 

var query = singleSource 
    // change this value to alter your "look at" time window 
    .Buffer(TimeSpan.FromSeconds(5))  
    .Select(buff => buff.ToRuns(op => op.Type).Where(run => run.Count > 0)); 

using(query.Subscribe(batch => 
{ 
    foreach(var item in batch) 
    { 
     Console.WriteLine("{0}({1})", item.First().Type, item.Count); 
    } 
})) 
{ 
    Console.ReadLine(); 
    producer.Dispose();  
} 

が旋回がいることを与える - ここで私は典型的な実験で見たものです。

insert(4) 
delete(2) 
insert(1) 
delete(1) 
insert(1) 
insert(1) 
delete(1) 
insert(1) 
delete(2) 
delete(2) 
insert(2) 
delete(1) 
insert(1) 
delete(2) 
insert(2) 
+0

ありがとうございます! これは私が必要とするものです。私はあなたに恵まれた恩恵を授けよう! – fra

4

MethinksあなたはGroupByUntilのミックスを後にしているものを手に入れることができます、DistinctUntilChanged、およびBuffer

これはあなたの例のコードに合うように微調整のビットを必要としますが、クエリ(およびコンセプト)ホールドする必要があります

(編集:DOH - そこにビットを逃しました...)

void Main() 
{ 
    var rnd = new Random(); 
    var fakeSource = new Subject<Operation>(); 
    var producer = Observable 
     .Interval(TimeSpan.FromMilliseconds(1000)) 
     .Subscribe(i => 
      { 
       var op = new Operation(); 
       op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete"; 
       fakeSource.OnNext(op); 
      });  
    var singleSource = fakeSource.Publish().RefCount(); 

    var query = singleSource 
     // We want to groupby until we see a change in the source 
     .GroupByUntil(
       i => i.Type, 
       grp => singleSource.DistinctUntilChanged(op => op.Type)) 
     // then buffer up those observed events in the groupby window 
     .Select(grp => grp.Buffer(TimeSpan.FromSeconds(8), 50)); 

    using(query.Subscribe(Console.WriteLine)) 
    { 
     Console.ReadLine(); 
     producer.Dispose();   
    } 
} 

public class Operation { 
    private static int _cnt = 0; 
    public Operation() { Seq = _cnt++; } 
    public int Seq {get; set;} 
    public string Type {get; set;}  
} 
+0

こんにちは!ありがとう、それは私に予想される出力を与えるものではありません。 2つのストリームを切り替えるのにもっと必要とするので、GroupByは私のニーズに合っていないと私は実際考えています。 ソリューションの出力を確認できますか?ありがとう、 – fra

+0

@fra Hah - 一部を逃した... *一息*私は編集します... ok、更新されたバージョンをお試しください? – JerKimball

+0

ありがとうございます、私たちはほとんどそこにいます。まだいくつかの疑問と問題があります。私の更新された質問を確認してください – fra