2016-11-22 3 views
2

これに対してリアクティブエクステンションを使用することはできますか? 基本的に私はメッセージ(私の熱い観察可能なものが上に座っているところ)を監視したいESBを持っていて、あるスレッドがブロックされている/条件が満たされた、および/またはタイムアウトが発生する。Reactive Extensionsを使用すると、何らかの条件が満たされるか、タイムアウトが発生するまでObservableのサブスクライバを作成できます

私はこれをTasks/TaskCompletionSourceとCancellationトークンで実現できると知っていますが、RXはうまくいくように思えました。

EDIT:タイムアウトは観測可能ではありませんが、タイムアウト/条件が満たされると、加入者は退会して処分することができます。

EDIT 2:サブスクライバとオブザーバブルは、異なるスレッドで処理される可能性がありますが、必ずしもそうである必要はありません。サブスクライバでasync/awaitを使用している場合、サブスクライバが存在する場所で実行を一時停止またはブロックできるようにしたいと考えています。基本的には、いくつかの作業の完了を通知する

私は後に何をするには、加入者の生涯の管理を活用しているので、何もする必要はありません。 IConnectableObservableはこの分野でいくつかの約束を提供していたようです。

おそらく、これは四角いペグの丸い穴の領域です。

+0

は、それが何を意味するの加入者がブロック/一時停止していますか?通知の受信を停止しますか? – supertopi

+0

加入者は通知されます。通知(OnNextを介して)すると、眼球の状態が満たされた場合、加入者はObservableから削除/切断されます。 – brumScouse

答えて

3

これはあなたが後にしていることだとは思えません。サンプルコードは、常に役立つ清澄器です。トリックはソースではなく、あなたの事業者を置くことですが、サブスクリプションによって:

async void Main() 
{ 
    var hotObservable = new Subject<string>(); 

    Func<string, bool> sub1_Condition = s => s != "Disconnect Sub1"; 
    var subscription1 = hotObservable 
     .Timeout(TimeSpan.FromSeconds(5)) 
     .TakeWhile(sub1_Condition) 
     .Subscribe(s => Console.WriteLine($"Sub 1: {s}"), _ => Console.WriteLine("Sub1 Timeout."),() => Console.WriteLine("Sub1 condition met or source ended.")); 

    Func<string, bool> sub2_Condition = s => s != "Disconnect Sub2"; 
    var subscription2 = hotObservable 
     .Timeout(TimeSpan.FromSeconds(2)) 
     .TakeWhile(sub2_Condition) 
     .Subscribe(s => Console.WriteLine($"Sub 2: {s}"), _ => Console.WriteLine("Sub2 Timeout."),() => Console.WriteLine("Sub2 condition met or source ended.")); 

    Func<string, bool> sub3_Condition = s => s != "Disconnect Sub3"; 
    var subscription3 = hotObservable 
     .Timeout(TimeSpan.FromSeconds(7)) 
     .TakeWhile(sub2_Condition) 
     .Subscribe(s => Console.WriteLine($"Sub 3: {s}"), _ => Console.WriteLine("Sub3 Timeout."),() => Console.WriteLine("Sub3 condition met or source ended.")); 

    hotObservable.OnNext("Hello"); 
    hotObservable.OnNext("Disconnect Sub1"); 

    await Task.Delay(TimeSpan.FromSeconds(3)); 

    hotObservable.OnNext("Just Sub 3 should be left"); 
    hotObservable.OnCompleted(); 
} 

これは次の出力を生成します。

Sub 1: Hello 
Sub 2: Hello 
Sub 3: Hello 
Sub1 condition met or source ended. 
Sub 2: Disconnect Sub1 
Sub 3: Disconnect Sub1 
Sub2 Timeout. 
Sub 3: Just Sub 3 should be left 
Sub3 condition met or source ended. 
+0

あなたは何を知っています!私はこれが私が欲しいものにv.closeだと思う。私はコードが助けることに同意する。私はこれをコード化します(私はTask、Tcsバリアントを実装しています)が、私にとってこのソリューションはサブスクリプション管理のためにきれいです。私は意味がありますか?多分私は利益をオーバープレイしている... – brumScouse

+0

私は思いますか? Rxはあなたがそれに慣れていない限り、あまり直感的ではありませんが、タスクよりもクリーンです。どのシナリオでこれはあなたのためには機能しませんか? – Shlomo

+0

私はそれをしなかったというわけではありません。代わりの実装を使用してチームの作業を迅速に進めました。 – brumScouse

関連する問題