2016-07-01 7 views
3

私はRxJavaが新しく、可逆バックプレッシャをサポートするRabbitMQキューからのObservableメッセージを実装しようとしています。私はSpring AMQP MessageListenerからObservableを作成することができました。これは、同期環境(例えば、callstack blocking)で背圧をうまく処理しますが、複数のスレッドが導入されるとすぐに、バックプレッシャーがウィンドウから外れます。クラスは以下の通りです:RxJava RabbitMQでバックプレッシャを実装する方法は?

import org.springframework.amqp.core.MessageListener; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.stereotype.Component; 
import rx.Observable; 
import rx.subscriptions.Subscriptions; 

import javax.inject.Inject; 

@Component 
public class CommandExchange { 
    private final MessageConverter messageConverter; 
    private final ConnectionFactory connectionFactory; 

    @Inject 
    public CommandExchange(MessageConverter messageConverter, ConnectionFactory connectionFactory) { 
     this.messageConverter = messageConverter; 
     this.connectionFactory = connectionFactory; 
    } 

    public <T extends Command> Observable<T> observeQueue(String... queueNames) { 
     return Observable.create(subscriber -> { 

      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
      container.setConnectionFactory(connectionFactory); 
      container.setQueueNames(queueNames); 
      container.setMessageListener((MessageListener) message -> { 
       T command = (T) messageConverter.fromMessage(message); 
       if (!subscriber.isUnsubscribed()) { 
        System.out.println("Being asked for a message."); 
        subscriber.onNext(command); 
       } 
      }); 
      container.start(); 

      Subscriptions.create(container::shutdown); 

     }); 
    } 
} 

私がブロックしたり、バッファリングせずにここでロスレスbackpressueを実装する方法のまわりで私の頭を取得することはできません。 Rabbit MQキューはすでにバッファであるため、バッファリングを使用するのは意味がありません。したがって、メッセージは、サブスクライバが準備ができているときにのみキューから消費されるべきです。解決策は、プルベースの観測可能性を使用することです(リスナーを使用せずに代わりにgrab a message when there is demand from the subscriber)。もしそうなら、現在キューにメッセージがない場合を処理するためのベストプラクティスは何でしょうか?

答えて

1

私はリスナーの使用をやめて、要求に応じてキューからメッセージを取得します。あなたは(なし待ちがあるかどうか、なし)あなたは、多かれ少なかれ、単に一つのメッセージを取得するためにとられるアクションを指定SyncOnSubscribe

Observable.create(new SyncOnSubscribe<T>() {...}); 

を使用している場合、要求会計と背圧は、すべてのあなたのために処理されます。

+0

これについてもう少しお手伝いが必要ですか?必要に応じて詳細を追加できます。 –

+0

'観測 observeQueue(文字列QUEUENAME)公共{ Observable.OnSubscribe ステートレス= SyncOnSubscribe.createStateless(S - > { s.onNext((T)rabbitTemplate.receiveAndConvert(QUEUENAME、-1))。 }); return Observable.create(ステートレス).subscribeOn(Schedulers.newThread()); } ' これはうまくいきますが、新しいスレッドで実行しているブロッキングオブザーバがここでベストプラクティスであるかどうかはわかりません。 –

関連する問題