2017-02-07 7 views
1

私はRxJavaを介してリクエストへの応答を聞くことができるような状況に陥っています。問題は、Observableを設定してイベントをリスンし、正しい順序で購読してメッセージを送信する方法がわからないことです。スレッドが中断されたり、応答が超高速であれば、私はそれを逃すことができるので、メッセージを送信したくない。これは私が上で考えることができ最も近い自分のRxJavaで購読するには

connection.onReceivedMessage() 
      .doOnSubscribe(() -> connection.send(message)) 
      .filter(message -> message.id == id) 
      ... // do stuff 

または

Observable.defer(() -> { 
    connection.send(message); 
    return connection.onReceivedMessage(); 
})... // do stuff 

しかし、私はまだメッセージを送ることができ、応答を聴くことがないように、これらはまだいるようです。誰かがこれをやろうとしましたか?私は本当に一種のafterCreate()が欲しいと思う。

答えて

1

スレッド が中断取得または応答が超高速で、私はそれを逃すことができれば、ので、私は耳を傾け、そのメッセージを送信する必要はありません。

Subjectを使用してください。 BehaviorSubject(常に最新の観測値を新しい加入者に放出します)またはReplySubject(新しい加入者に放出されたすべてのObservableを放出します)のいずれかです。私は、全体のロジックについてはよく分からないですが、あなたのようなものかもしれない。この場合には、最新のメッセージが

に送信されますこのよう

public BehaviorSubject mMessageBehaviorSubject = BehaviorSubject.create(); 


private void sendMessage() { 
    connection.onReceivedMessage() 
      .doOnSubscribe(() -> connection.send(message)) 
      .filter(message -> message.id == id) 
      .subscribe(mSubject::onNext, Throwable::printStackTrace); 
} 


public Observable<String> getMessageObservable() { 
    return mMessageBehaviorSubject.asObservable(); 
} 

をメッセージとするたびに湯を聞く準備ができているを送ることができます

0

doOnSubscribeは必要なものです。探しているのは "afterCreate"です。あなたの最初の例では、最初のメッセージはサブスクリプション後に送信されます。この時点でObservableはすでにレスポンスを処理する準備ができています。

あなたの質問について - 他に誰かがこれをやろうとしましたか? - 応答ははいです。あなたの最初の例と同じテクニックを使っていくつかのコードをrxifyします。

+0

doOnSubscribeはそのように機能しますか?私はこれ以上テストしなければならないでしょう。私は確認するために単体テストをしたことはないと認めます。 – Buttink

関連する問題