2017-08-22 5 views
0

RX .netでは、Keyでグループ分けされた最新のアイテムを最新のものでサンプリングすることは可能ですか?RX .netでは、Keyでグループ分けされた熱心観測で最新のアイテムをサンプリングできますか?

例えば、私は価格である価格、のIObservableを持っている場合:

Price 
- Key 
- Bid 
- Offer 

はのはIObservableが外部価格フィードにリンクされていると仮定しましょう。

RXを使用して1秒ごとにサンプリングされたキーでグループ化された最新の価格をすべて取り出せますか?

答えて

1

観察可能なものとしてsourceがあると仮定すると、キーでグループ化されサンプリングされたすべての価格が1秒前に返されます。

var sampled = source 
    .GroupBy(p => p.Key) 
    .SelectMany(o => o.Sample(TimeSpan.FromSeconds(1))); 

最後の1秒間にメッセージを受け取っていない価格がある場合、それは含まれません。

古い価格が含ましたい場合、これは動作します:

var sampled2 = source 
    .Scan(ImmutableDictionary<int, Price>.Empty, (state, p) => state.SetItem(p.Key, p)) 
    .Replay(1) 
    .RefCount(); 
var dummySubscription = sampled2.Subscribe(); 
var result = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .SelectMany(_ => sampled2.Take(1).SelectMany(state => state.Values)); 

だけで観測可能resultで行われたときにDummySubscriptionを処分することを確認してください。

+0

ありがとうございました。しかし、1秒以上前に受け取った価格もどうやって含めることができますか? – ahallan

+0

回答が更新されました。 – Shlomo

0

熱い観測可能性は、メモリ内の古い値を保持しませんが、あなたが知っている各キーの最終価格を、例えば辞書の中で見つけることができます。

次のサンプルコードを参照してください。

各既知のキーの最後の引用符を毎秒印刷する必要があります。

+0

私はビジネスロジックがObservable内でなければならないと思っています。 – supertopi

+0

のようなサブスクリプションではありません...ビジネスロジック...?質問とその答えをもう一度読んでください。 – mm8

+0

質問は 'IObservable 'から始めることを尋ねており、毎秒値を生成したいと考えています。これはそうしない。 – Enigmativity

0

これは必要な操作ですか?

IObservable<ImmutableDictionary<string, Price>> sampled = 
    Observable 
     .Create<ImmutableDictionary<string, Price>>(o => 
     { 
      var output = ImmutableDictionary<string, Price>.Empty; 
      return 
       source 
        .Do(x => output = output.SetItem(x.Key, x)) 
        .Select(x => Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(y => output).StartWith(output)) 
        .Switch() 
        .Subscribe(o); 
     }); 
関連する問題