2017-02-23 5 views
0

Akkaクラスタアプリケーションのコンテキストでは、Akkaが期待する1つのプロパティに関する問題が発生しました。すべての(cas)クラスと使用されるすべてのメッセージは直列化可能でなければなりません。私は以下のコンテキストを持っています:Redisクラスタからデータを消費したいので、クラスタ対応のルータプールを採用して、ノードを追加してより多くの作業者を確保することにしました。作業者はredisからデータを読み取り、mongodbにいくつかのメタデータを保存します。Akkaクラスタ対応ルータ - すべてのルートにredisインスタンスを共有

object MasterWorkers { 

    def props 
    ( awsBucket : String, 
    gapMinValueMicroSec : Long, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) : Props = 
    Props(MasterWorkers(awsBucket, gapMinValueMicroSec, persistentCache, mongoURI, mongoDBName, mongoCollectioName)) 

    case class JobRemove(deviceId: DeviceId, from : Timestamp, to : Timestamp) 
} 

case class MasterWorkers 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

    val workerRouter = 
    context.actorOf(FromConfig.props(Props(classOf[Worker],awsBucket,gapMinValueMicroSec, self, persistentCache, mongoURI, mongoDBName, mongoCollectioName)), 
    name = "workerRouter") 

ワーカークラス:

object Worker { 

    def props 
    (
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) : Props = 
    Props(Worker(awsBucket, gapMinValueMicroSec, replyTo, persistentCache, mongoURI, mongoDBName, mongoCollectioName)) 

    case class JobDumpFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp) 
    case class JobDumpSuccess(deviceId : DeviceId, from: Timestamp, to: Timestamp) 

    case class JobRemoveFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp) 
} 

case class Worker 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    persistentCache: RedisCache, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

しかし、私は二つのノード始めたとき、これは、以下の例外が発生します:Redisのキャッシュは簡単です

[info] akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer]. 
[info] at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61) 
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895) 
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895) 
[info] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
[info] at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:894) 
[info] at akka.remote.EndpointWriter.writeSend(Endpoint.scala:786) 
[info] at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:761) 
[info] at akka.actor.Actor$class.aroundReceive(Actor.scala:497) 
[info] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452) 
[info] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) 
[info] at akka.actor.ActorCell.invoke(ActorCell.scala:495) 
[info] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) 
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:224) 
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
[info] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[info] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[info] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[info] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[info] Caused by: java.io.NotSerializableException: akka.actor.ActorSystemImpl 

を最初のバージョンでは、私はこれをしませんでした次のようなインタフェースを実装したコンパニオンオブジェクトのケースクラスです。

​​

その後、私は労働者でredisCacheを移動し、問題を解決するために、私はマスターノードにそれを与えていないよ:

case class Worker 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

// redis cache here now 
val redisCache = ... 

しかし、そのようなデザインで、あらゆるrouteeはRedisの新しいインスタンスを作成します。キャッシュと予想される動作ではありません。私が望むのは、私のredisキャッシュのインスタンスを1つ作成してそれをすべてのルートと共有することですが、クラスタアプリケーションのコンテキストでは不可能なようですので、設計上の失敗か経験不足かはわかりませんAkkaと一緒に。誰かが同様の問題に直面した場合、私は喜んでアドバイスを受け取ります!

答えて

0

問題はあなたのRedisCacheがそれほど単純ではないということです。それはシリアル化できないActorSystemを持ちます。

これは、RedisClient個のインスタンスが含まれているためです - rediscalaライブラリで、ActorSystemが必要です。

俳優システムから抽象化し、作業者にはRedisクラスターの詳細(例:RedisServerオブジェクト)のみを渡す必要があります。

労働者は、context.systemを使用してRedisClient自身をインスタンス化します。

case class Worker 
(
    awsBucket : String, 
    gapMinValueMicroSec : Long, 
    replyTo : ActorRef, 
    redisMaster: RedisServer, 
    redisSlaves: Seq[RedisServer], 
    mongoURI : String, 
    mongoDBName : String, 
    mongoCollectioName : String 
) extends Actor with ActorLogging { 

    val masterSlaveClient = ??? //create from the RedisServer details 

} 

これにより、各作業者は、自分の接続をredisクラスタにすることができます。

また、マスターに一度だけ接続し、ワーカーとの接続を共有する場合は、接続を埋め込むRedisClientActor(ここではsource)を渡す必要があります。これはActorRefであり、リモートで共有できます。

これはclient.redisConnectionを呼び出すことで取得できます。

労働者は、その後、このソリューションについて例えば

case class Worker 
    (
     awsBucket : String, 
     gapMinValueMicroSec : Long, 
     replyTo : ActorRef, 
     redisConnection: ActorRef, 
     mongoURI : String, 
     mongoDBName : String, 
     mongoCollectioName : String 
    ) extends Actor with ActorLogging with ActorRequest { 

     // you will need to implement the execution context that ActorRequest needs as well.. 

     send(redisCommand) 

    } 
+0

をその周りActorRequestを構築することができ、それは各ワーカーはRedisのクライアントをインスタンス化することを意味しますか? – alifirat

+0

はい。私はちょうどそれがあなたが興味を持っている場合は、1つの接続を共有することを含むもう1つのアプローチを追加しました。 –

+0

はい、私は代わりに考えていませんでした。私はそれを試してあなたに知らせるでしょう! – alifirat

関連する問題