2016-12-14 7 views
2

私は多数の観測値を持ち、これらを単一の値に集計する必要があります。彼らは頻繁に更新され、「損失の多い」背圧戦略が使用されなければならないと考えられているので、それを取る。私はsampleを選択しましたが、それと似たようなものは受け入れられるでしょう。 これを行う最も効率的な方法(パフォーマンスおよび/またはリソースに賢い)がであることを知りたいと思います。 2つのアプローチが明白です:多数のrx観測値を効率的にサンプリングする

val list : Iterable[Observable[Double]] = ??? 
val f : (Double,Double) => Double = ??? 
(Observable combineLatest list) sample (1 second) reduce f 

または代わり

combineLatest (list map (_ sample (1 second))) reduce f 

私は、これは非常に一般的なユースケースであると言うために公正だと思います。これを抽象化しないようにするには、データを連続的に放出する多くの温度センサーがあると想像してください。これらは私のlistの要素になります。彼らはSI単位で温度を生成し、平均温度を計算したい(これはfです)。また、結果をCelciusまたはFahrenheitで表示するかどうかを選択したいと思っています。

  1. 上記の2つのアプローチ(他の提案は歓迎します)のどちらがより効率的なスペース/時間ですか?
  2. 私は平均気温だけでなく、個々の(変換されたかもしれない)気温も表示したかったと言います。それが答えに影響しますか?
  3. Observablesが異なるソース(別々のスレッドで観察されるなど)から来た場合と、1つのシリアライズされたストリーム(1つの観測でgroupBy演算子の結果など)に含まれているかどうかによって、
  4. 上記の背圧について念頭におくと、この要件が破棄された場合(つまり、再サンプリングする必要がない場合)、回答が異なる可能性があります。
+0

あなたは解決策を見つけましたか? –

+0

いいえ。私はRXから移った。私はこれに共通のパターンであるにもかかわらず、特殊なデータ構造が必要だと思います。これを実装するのは難しいことではありません(私が上に示唆したように)、しかし、ハードビットは、モナド/コレクションのようなインターフェイスでそれをやっています。例えばhttps://github.com/OpenHFT/Chronicle-Mapを見てください。 – Luciano

答えて

0

ここでは、as演算子が便利だと思います。私の場合、私は観測の束を持っているだけではありません。私は observablesを持っているものをたくさん持っています。だから、最初に私は、その観測可能で、それぞれの事を関連付けるためにヘルパークラスを作成しました:

public class Pair<K, V>{ 
    private final K mKey; 
    private final Observable<V> mValues; 

    public Pair(final K key, final Observable<V> values) { 
     mKey = key; 
     mValues = values; 
    } 
    public K key() { 
     return mKey; 
    } 

    public Observable<V> values() { 
     return mValues; 
    } 
} 

その後、私はObservable<Map<K,V>>Observable<Pair<K,V>>を変換ObservableConverterを作成しました。これは、定期的に各Kから最新のVを含むマップを放出します:

public class MapLatest<K, V> implements ObservableConverter<Pair<K, V>, Observable<Map<K, V>>> { 
    private final PublishSubject<Map<K,V>> mPub = PublishSubject.create(); 
    private final Map<K, V> mMap = new ConcurrentHashMap<>(); 

    @Override 
    public Observable<Map<K, V>> apply(final Observable<Pair<K, V>> pairs) { 
     pairs.subscribe(this::subscribe); 
     return mPub; 
    } 

    private void subscribe(final Pair<K,V> pair) { 
     final K k = pair.key(); 
     pair.values().doOnNext(v -> mMap.put(k, v)).doOnTerminate(() -> mMap.remove(k)); 
    } 
} 

私の「もの」はプレーヤーであると私は定期的にそれらの位置のR-ツリーを構築したいです。だから私は、その場所の観測で選手をペアリングして、MapLatestを適用します。私は実際には何を発するMapLatestで任意のコードを示さなかった

players.map(player -> new Pair<>(player, player.location())) 
    .as(new MapLatest<>()).subscribe(...); 

注意。私はそれを行うためのさまざまな方法を検討している:

  • Observable<?>MapLatestを構築し、いつでもその観測可能発するを発します。
  • MapLatestPredicate<V>で構成し、それを発行するときにtrueを返す。
  • MapLatestemitメソッドを与え、外部から排出をトリガします。

また、Observable<Map<K,V>>は決して終了しません。私の目的のために、それは意図的です。

関連する問題