2016-06-17 4 views
0

私のアプリケーションにはプロデューサとコンシューマがあります。私のプロデューサーは不規則にメッセージを出します。いつか私のキューは空になるでしょうが、いつか私はいくつかのメッセージを持っています。 私は消費者に待ち行列を聞かせてもらいたいと思います。メッセージが入っているときは、取り出してこのメ​​ッセージを処理してください。このプロセスには数時間かかることがあり、現在のメッセージの処理が完了していなければ、消費者にキューの別のメッセージを送信させたくありません。コンシューマ/プロデューサAWS SQSシンクロンコンシューマを使用するakaスカラ

私はAKKAとAWS SQSが私のニーズを満たすことができると思います。文書や例を読むことで、akka-camelは私の仕事を単純化することができます。

これはギブスでexampleでした。

私は(のThread.sleepはちょうど私の処理をシミュレートすることである)消費者の構成では、より興味を持っています:

class MySqsConsumer extends Consumer { 

    //The SQS URI is an in-only message exchange (autoAck=true) 
    override def endpointUri = "aws-sqs://sqs-akka-camel? amazonSQSClient=#client" 

    override def receive = { 
    case msg: CamelMessage => { 
     println("Start received %s" format msg.bodyAs[String]) 
     Thread.sleep(4000) 
     println("Stop received %s" format msg.bodyAs[String]) 

    } 

    } 
} 

はのStackTrace:

... 
14:38:53.335 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World1!] 
14:38:54.051 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World2!] 
14:38:54.753 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World3!] 
Start received Hello World1! 
Stop received Hello World1! 
Start received Hello World2! 
Stop received Hello World2! 
Start received Hello World3! 
... 

ここに私の問題は、そのakka-ですcamelは、受信メソッドが完了する前にキューからメッセージを読み取ります。

新しいメッセージを受け取る前に、消費者にプロセスの終了を待たせることができますか?間違ったツールやライブラリを使用している場合は、新しいツールに向けて私を案内できますか?

答えて

1

わからないあなたがアッカのストリームを使用している場合は、しかし、あなたがしている場合、Sink.queueにソースを送っ

val graph=yourSource.to(Sink.queue) 

それはSinkQueueを返します(graph.run)マテリアライズすると、手動でアイテムを引くことができますがそれから。背圧メカニズムを使用するので、アイテムを引っ張っても何が起きるのか心配する必要はありません。

注:私は意図的にジェネリックを無視しました!しかし、それらを忘れないでください。

0

私が知る限り、あなたはそれをすることはできません。 Camelは常にキューからメッセージを消費します。

override def autoAck = falseを設定することができます。あなたが現在のメッセージを確認するまで、別のメッセージをに送信しません。しかし、「隠された」ラクダの俳優はまだ消費しています。

カスタマイズしたamazonSQSClientを提供し、何とかそれを制御できますか?

関連する問題