2012-04-24 6 views
0

RX(Reactive extensions)を使用してイベントアグリゲータを実装しようとしていますが、ここではコードが使用されていますが、 subject.AddDisposableメソッドがありません。誰でも助けてくれますか?多分古いバージョンなので、Rxの新しいバージョンでこのメソッドは削除されましたか?これを実装する正しい方法は何か?メソッドが反応性の範囲外にある

if (_observablesByTypeKey.ContainsKey(key)) 
      { 
       Tuple<object, object> tuple = _observablesByTypeKey[key]; 
       stream = (IObservable<T>)tuple.Item2; 
      } 
      else 
      { 
       Type specificSubjectType = typeof(Subject<>).MakeGenericType(new[] { typeof(T) }); 
       var subject = (Subject<T>)Activator.CreateInstance(specificSubjectType, new object[] { }); 

       var removeEventStreamFromCache = Disposable.Create(
        () => 
         { 
          lock (_observablesByTypeKeyLock) 
          { 
           _observablesByTypeKey.Remove(key); 
          } 
         } 
        ); 

       stream = subject.AddDisposable(removeEventStreamFromCache).Publish().RefCount(); 

       var tuple = new Tuple<object, object>(subject, stream); 
       _observablesByTypeKey.Add(key, tuple); 

答えて

2

あなたは

stream = Observable.Create(observer => 
    new CompositeDisposable(
    subject.Subscribe(observer), 
    removeEventStreamFromCache)); 
stream.Publish().RefCount(); 

stream = subject.AddDisposable(removeEventStreamFromCache).Publish().RefCount(); 

を置き換えることができます

関連する問題