2011-01-27 3 views
2

私のモデルには、約8〜9個のScalaアクタがあります。 各アクターは、いくつかのScalaアクタが同時に実行しているときに8-10​​人のアクタが待機状態になる

各アクターの行為の方法で

.IT連続

def act { 
    this ! 1 
    loop { 
     react { 
     case 1 => processMessage(QManager.getMessage); this ! 1 
     } 
    } 
    } 

I RabbitMQのQManagerのgetMessageメソッド

def getMessage: MyObject = { 
    getConnection 
    val durable = true 
    channel.exchangeDeclare(EXCHANGE, "direct", durable) 
    channel.queueDeclare(QUEUE, durable, false, false, null) 
    channel queueBind (QUEUE, EXCHANGE, _ROUTING_KEY) 
    consumer = new QueueingConsumer(channel) 
    channel basicConsume (QUEUE, false, consumer) 

    var obj = new MyObject 
    try { 
     val delivery = consumer.nextDelivery 
     val msg = new java.io.ObjectInputStream(
     new java.io.ByteArrayInputStream(delivery.getBody)).readObject() 
     obj = msg.asInstanceOf[MyObject] 
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false) 
    } catch { 
     case e: Exception =>logger.error("error in Get Message", e);endConnection 
    } 
    endConnection 
    obj 
    } 

ようキュー にリストアップされたのRabbitMQサーバー上の独自のキューを持っていますすべての9人のアクターは、独自のオブジェクトタイプと独自のQManagerを持っています

GetMessageで私はRabbitmq QueueConsu私は彼らの唯一の4は、他には記載しておりません正常に動作し、すべての8人の俳優を起動したときにキュー 、この方法ではitfounds状態

待機中の俳優を置くとき

val delivery = consumer.nextDelivery 

マー

とnextDeliveryメソッドは、オブジェクトを返します。私は4人の役者

は、Scalaのアクターのスレッドに問題がthererであることがより開始したとき アローン

問題を始めたとき、私はテスト一人ひとりの俳優が稼働して相まって、彼らが正常に動作していが発生します。

答えて

5

免責事項:レックスが言うように、私は、アッカ

のPOをしていますあなたがスレッドの共有プールに、スレッドを占有、ビジー待っています。

俳優としてあなたがアッカをテストするためのオプションを持っている場合、私は知らないが、我々はAMQP消費者(及び生産者)のサポートを持っている:AMQPの消費

val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic) 
    val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), producerId = Some("my_producer")) 
producer ! Message("Some simple sting data".getBytes, "some.routing.key") 

Akka-AMQP

はAMQPメッセージをプロデュースメッセージ:

val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic) 
val myConsumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing.key", actorOf(new Actor { def receive = { 
    case Delivery(payload, _, _, _, _, _) => log.info("Received delivery: %s", new String(payload)) 
}}), None, Some(exchangeParameters))) 

別のオプションは、俳優

0でAMQPメッセージを消費して生産するために Akka-Camelを使用することです
+0

Akkaリンクありがとう –

5

すべてのあなたの俳優は常に動いています。彼らは休憩を取ることはありません。アクターは共通のスレッドプールで共有されるので、これは幸運な勝者の俳優が常に動いていることを意味し、不運な敗者は決して時間をとることはありません。常にスレッド全体を取得するエンティティを使用したい場合は、reactの代わりにreceiveを使用するか、少なくともreactの代わりにreceiveを使用することが一般的です(Java Thread)。俳優の数を俳優の数と一致させるために俳優プールのサイズを増やすこともできますが、一般的に非常に多くの俳優が常駐している場合は、プログラムの構造をより慎重に考える必要があります。

+0

私は俳優プールのサイズを増やす方法を知っているかもしれません –

+0

回答を見るhttp://stackoverflow.com/questions/1597899 –

関連する問題