2015-12-14 19 views
6

私は実装している関数を呼び出すフレームワークを扱っています。私は、この関数のパラメータをObservableに変換し、一連のオブザーバを介して送信したいと思います。私はこのために主題を使うことができると思ったが、私が期待したように行動していない。RxJS:オブザーバが複数のオブザーバを処理する方法は?

明確にするために、私は次のコードのようなものを持っています。私はOption 1が動作すると思っていましたが、今のところ私はOption 2のために解決しています。それはまったく慣れないようです。

var eventSubject = new Rx.Subject(); 
var resultSource = eventSubject.map(processEvent); 
var subscription = resultSource.subscribe(
    function(event) { 
     console.log("got event", event); 
    }, 
    function(e) { 
     log.error(e); 
    }, 
    function() { 
     console.log('eventSubject onCompleted'); 
    } 
); 

// The framework calls this method 
function onEvent(eventArray) { 

    var eventSource = Rx.Observable.from(eventArray); 

    // Option 1: I thought this would work, but it doesn't 
    // eventSource.subscribe(eventSubject); 

    // Option 2: This does work, but its obviously clunky 
    eventSource.subscribe(
     function(event) { 
      log.debug("sending to subject"); 
      eventSubject.onNext(event); 
     }, 
     function(e) { 
      log.error(e); 
     }, 
     function() { 
      console.log('eventSource onCompleted'); 
     } 
); 
} 
+0

あなた自身を登録し、何かその 'onEvent'ハンドラですか? – user3743222

+0

いずれにしても、私が 'Subject.create(observer、observable)'を使っていると思うことは、あなたが渡すオブザーバが 'eventSource 'に渡したものとまったく同じことをするので、 .subscribe'ので、他の人の命題を見てみましょう。 – user3743222

+0

@ user3743222 - onEventは私が書いた関数ですが、私のフレームワーク(ループバック)は関数が存在することを知り、それ自身のコードから呼び出します。 fromEvent()またはfromEventPattern()メソッドは、フレームワークのハンドラの登録方法と一致しないため、使用できません。 – JBCP

答えて

2

それはあなたが観測可能に全体Subjectをサブスクライブするとき、あなたは被験者にその観察できるのonCompleteイベントをサブスクライブ終わるだけのことです。つまり、観測が完了すると、それはあなたの主題を完成させるでしょう。したがって、次の一連のイベントを取得すると、対象はすでに完了しているため、何もしません。

解決策の1つはまさにあなたがやったことです。件名にonNextイベントを登録するだけです。

var eventSubject = new Rx.Subject(); // observes observable sequences 
var resultSource = eventSubject 
    .mergeAll() // subscribe to each inner observable as it arrives 
    .map(processEvent); 
var subscription = resultSource.subscribe(
    function(event) { 
     console.log("got event", event); 
    }, 
    function(e) { 
     log.error(e); 
    }, 
    function() { 
     console.log('eventSubject onCompleted'); 
    } 
); 

// The framework calls this method 
function onEvent(eventArray) { 

    var eventSource = Rx.Observable.from(eventArray); 
    // send a new inner observable sequence 
    // into the subject 
    eventSubject.onNext(eventSource); 
} 

更新:Here is a running example

+0

愚かな質問:onNext、onError、onCompleteのコンボをオブジェクト(おそらくオブザーバ)にすることは可能ですか?モジュール化のためにすべてのものを別のファイルに移動することができますか?私は 'resultSource.subscribe(mySink)'(より流動的です)のようなことをしたいのですが、今は 'mySink(resultSource)'を実行しなければなりません。mySinkは内部的にサブスクリプションを行います。 – JBCP

+0

'.merge ) 'はドキュメントと一致していないようです:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/merge.md – JBCP

+0

あなたは' .mergeAll() 'を意味しますか? ? docs:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/mergeall.mdを参照してください。 – JBCP

3

ブランドンとして

第二の溶液、及び間違いなくより多くの「RXのようには、」観測の観測可能として、あなたの着信データを扱い、mergeAllと、この観測可能なストリームを平坦化することですeventSubject onNext、onError、onCompleteをonNext、onError、onCompleteの各オブザーバにサブスクライブすることによって、eventSubjectを別のオブザーバブルにサブスクライブすることが既に説明されています。あなたの例からは、onNextに登録したいだけのようです。

あなたの件名は、最初のeventSourceが完了すると/ errrosします。エラーが発生すると、eventSubjectは後続のeventSouresによって発生したそれ以上のonNext/onErrorを正しく無視します。

  1. は手動でのみonNextを購読する:

    はどんなのEventSourceのonNextに加入するために複数の方法があります。

    resultSource = eventSubject 
        .map(processEvent); 
    
    eventSource.subscribe(
        function(event) { 
         eventSubject.onNext(event); 
        }, 
        function(error) { 
         // don't subscribe to onError 
        }, 
        function() { 
         // don't subscribe to onComplete 
        } 
    ); 
    
  2. だけあなたのためonNext/onErrorをeventSourcesに加入扱う演算子を使用します。これはブランドンが提案したものです。 これは、eventSources onErrorにも加入していることに注意してください。これは、あなたの例のようには思えません。

    resultSource = eventSubject 
        .mergeAll() 
        .map(processEvent); 
    
    eventSubject.onNext(eventSource); 
    
  3. のonError/onCompleteのeventSourcesためのonError/onCompleteのeventSubjectsを呼び出すことはありませんオブザーバを使用してください。あなたは単に汚れたハックとしてeventSubjects onCompleteを上書きすることができますが、おそらく新しいオブザーバを作成する方が良いでしょう。

    resultSource = eventSubject 
        .map(processEvent); 
    
    var eventObserver = Rx.Observer.create(
        function (event) { 
        eventSubject.onNext(event); 
        } 
    ); 
    
    eventSubject.subscribe(eventObserver); 
    
関連する問題