私はRxJavaが新しく、可逆バックプレッシャをサポートするRabbitMQキューからのObservableメッセージを実装しようとしています。私はSpring AMQP MessageListenerからObservableを作成することができました。これは、同期環境(例えば、callstack blocking)で背圧をうまく処理しますが、複数のスレッドが導入されるとすぐに、バックプレッシャーがウィンドウから外れます。クラスは以下の通りです:RxJava RabbitMQでバックプレッシャを実装する方法は?
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
import rx.Observable;
import rx.subscriptions.Subscriptions;
import javax.inject.Inject;
@Component
public class CommandExchange {
private final MessageConverter messageConverter;
private final ConnectionFactory connectionFactory;
@Inject
public CommandExchange(MessageConverter messageConverter, ConnectionFactory connectionFactory) {
this.messageConverter = messageConverter;
this.connectionFactory = connectionFactory;
}
public <T extends Command> Observable<T> observeQueue(String... queueNames) {
return Observable.create(subscriber -> {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueNames);
container.setMessageListener((MessageListener) message -> {
T command = (T) messageConverter.fromMessage(message);
if (!subscriber.isUnsubscribed()) {
System.out.println("Being asked for a message.");
subscriber.onNext(command);
}
});
container.start();
Subscriptions.create(container::shutdown);
});
}
}
私がブロックしたり、バッファリングせずにここでロスレスbackpressueを実装する方法のまわりで私の頭を取得することはできません。 Rabbit MQキューはすでにバッファであるため、バッファリングを使用するのは意味がありません。したがって、メッセージは、サブスクライバが準備ができているときにのみキューから消費されるべきです。解決策は、プルベースの観測可能性を使用することです(リスナーを使用せずに代わりにgrab a message when there is demand from the subscriber)。もしそうなら、現在キューにメッセージがない場合を処理するためのベストプラクティスは何でしょうか?
これについてもう少しお手伝いが必要ですか?必要に応じて詳細を追加できます。 –
'観測 observeQueue(文字列QUEUENAME)公共{ Observable.OnSubscribe ステートレス= SyncOnSubscribe.createStateless(S - > { s.onNext((T)rabbitTemplate.receiveAndConvert(QUEUENAME、-1))。 }); return Observable.create(ステートレス).subscribeOn(Schedulers.newThread()); } ' これはうまくいきますが、新しいスレッドで実行しているブロッキングオブザーバがここでベストプラクティスであるかどうかはわかりません。 –