私のアプリケーションにはプロデューサとコンシューマがあります。私のプロデューサーは不規則にメッセージを出します。いつか私のキューは空になるでしょうが、いつか私はいくつかのメッセージを持っています。 私は消費者に待ち行列を聞かせてもらいたいと思います。メッセージが入っているときは、取り出してこのメッセージを処理してください。このプロセスには数時間かかることがあり、現在のメッセージの処理が完了していなければ、消費者にキューの別のメッセージを送信させたくありません。コンシューマ/プロデューサAWS SQSシンクロンコンシューマを使用するakaスカラ
私はAKKAとAWS SQSが私のニーズを満たすことができると思います。文書や例を読むことで、akka-camelは私の仕事を単純化することができます。
これはギブスでexampleでした。
私は(のThread.sleepはちょうど私の処理をシミュレートすることである)消費者の構成では、より興味を持っています:
class MySqsConsumer extends Consumer {
//The SQS URI is an in-only message exchange (autoAck=true)
override def endpointUri = "aws-sqs://sqs-akka-camel? amazonSQSClient=#client"
override def receive = {
case msg: CamelMessage => {
println("Start received %s" format msg.bodyAs[String])
Thread.sleep(4000)
println("Stop received %s" format msg.bodyAs[String])
}
}
}
はのStackTrace:
...
14:38:53.335 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World1!]
14:38:54.051 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World2!]
14:38:54.753 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World3!]
Start received Hello World1!
Stop received Hello World1!
Start received Hello World2!
Stop received Hello World2!
Start received Hello World3!
...
ここに私の問題は、そのakka-ですcamelは、受信メソッドが完了する前にキューからメッセージを読み取ります。
新しいメッセージを受け取る前に、消費者にプロセスの終了を待たせることができますか?間違ったツールやライブラリを使用している場合は、新しいツールに向けて私を案内できますか?