2016-08-03 10 views
5

注:hereからのスレッドの再送信です。複数のスレッドを持つSQSキューからの読み取り

こんにちはすべて、 メッセージを処理するプロセスが1つのSQSキューにあります。キューにはメッセージが多数あり、各メッセージはデータベースにヒットします。したがって、私はこのキューの読者をスレッド化したかったのです。

各スレッドのための基本的なコードは次のとおりです。

public void run() { 
    while(true) { 
     ReceiveMessageRequest rmr = new ReceiveMessageRequest(queueUrl) 
       .withMaxNumberOfMessages(10) 
       .withWaitTimeSeconds(3); 
     List<Message> messages = sqsClient.receiveMessage(rmr).getMessages(); 
     // process messages 
     // delete messages 
    } 
} 

私が見ている何のスレッド間で重複したメッセージのトンがあるということです。ここではいくつかの重複があるはずですが、各スレッドが同じメッセージセットを取得し、現実的には1つのスレッドだけが多くの作業を行うように見えます。

APIの使い方を誤解していますか、他に何か間違っていますか? Javadocは、AmazonSQSクラスがスレッドセーフであることを示しています。実際、各スレッドの新しいAmazonSQSクラスを作成しても何も変わりませんでした。

すべてのポインタが高く評価されます。私の現在の考え方は、SQSキューから単一のスレッドを読み込み、各メッセージをLinkedBlockingDequeのようなものに入れて、ワーカーにそのメッセージを読み込ませることです。しかし、私はその実装が私が望むほど速くキューを排水しないと感じています。

+1

何シングルスレッドでキューに耳を傾け、その後、あなたが受け取る各メッセージを処理するために、新しいスレッドをスピンアップはどうですか? –

+0

@マークB - それは私の最後の段落で提案したもののバリエーションです - 私はそれを避けることを望んでいましたが、それが最良の賭けかもしれません。 – stdunbar

+2

プロセスにはどのくらい時間がかかりますか?メッセージを処理して削除する処理時間を与えるには、メッセージ要求でsetVisibilityTimeoutを使用する必要があります。 – Larry

答えて

1

メッセージごとにデータベースがヒットしたので、各メッセージの処理に時間がかかるようです。キューの可視性タイムアウトを増やす必要があります。 AWS SQSのドキュメントから

:メッセージが受信された直後

、それはキューに残ります。 他のコンシューマがメッセージを再度処理できないようにするため、Amazon SQS は、アビリティSQS が他のコンシューマコンポーネントがメッセージを受信して​​処理しないようにする時間を設定します。

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html

+0

長い視認性のタイムアウトは、長いメッセージ処理時間の場合に重複したメッセージで問題を引き起こす可能性がありますが、3分に設定されたため、私のケースでは根本的な原因ではありませんでした。 。私はAkkaの固定ディスパッチャーを使用して、単一のスレッドがキューからすべての読み取りを行うことを保証しました。メッセージ処理は異なるスレッドによって行われ、キューからの新しいメッセージの読み取りを妨げないようにする –

関連する問題