2017-07-14 1 views
0

私はリアクションパラダイム全体に慣れていないし、SQSのような待ち行列から読むときの背圧の働きを理解しようとしている。反応器内のReactor/RxJava SQSから読むときのバックプレッシャー

あなたのフラックスを持っているだろうし、RxJavaにあなたのような背景の中で、あなたの観察可能なポーリングSQSを持っていると思います:

while (true) { 
    Future<ReceiveMessageResult> future = sqsClient.receiveMessageAsync(queueUrl); 
    //emit or send to subscribers 
} 

は、あなたがあるREST呼び出しを行う必要がある下流のコンポーネントを持っているとしましょうレート制限付き。ポーラーにレート制限のために減速するように指示すると、OOMの可能性を秘めたライブメッセージがメモリに残ってしまうことはありませんか?

答えて

1

このシナリオでは、Flowableを使用する必要があります。 Flowableを使用すると、サブスクライバは、受信したメッセージを処理した後、一度に特定の数のメッセージを要求することができます。

参考:https://medium.com/@srinuraop/rxjava-backpressure-3376130e76c1

Flowable<Integer> observable = Flowable.range(1, 133); 
observable.subscribe(new DefaultSubscriber<Integer>() { 
@Override public void onStart() { 
request(1); 
} 
@Override public void onNext(Integer t) { 
LOGGER.info(“item “+t); 
//this where you request message, in this case one message at time and after processing one message it will request for next one. 
request(1); 
} 
@Override public void onError(Throwable t) { 
LOGGER.info(“”+t); 
} 
@Override public void onComplete() { 
LOGGER.info(“complete”); 
} 
}); 
+0

がそれを手に入れました!ありがとう! – 2Real

関連する問題