2016-03-30 10 views
1

を動作するコードですされていません。はなぜ、この永続的な俳優はここ

import akka.persistence._ 
import akka.actor.{Actor, ActorRef, ActorSystem, Props, ActorLogging} 


class Counter extends PersistentActor with ActorLogging { 

    import Counter._ 

    var state: State = new State(0) 

    override def receiveRecover: Receive = { 
    case RecoveryCompleted => println("Recovery completed.") 
    case SnapshotOffer(_, snapshot: State) => state = snapshot 
    case op: Operation => updateState(op) 
    } 


    override def persistenceId: String = "counter-persistent" 

    override def receiveCommand: Receive = { 
    case op: Operation => 
     println(s"Counter receive ${op}") 
     persist(op) { 
     op => updateState(op) 
     } 
    case "print" => println(s"The current state of couter is ${state}") 
    case SaveSnapshotFailure(_, reason) => println(s"save snapshot failed, reason: ${reason}") 
    case SaveSnapshotSuccess(_) => println(s"snapshot saved") 
    } 

    def updateState(op: Operation): Unit = op match { 
    case Increment(n) => 
     state = state.inc(n) 
     takeSnapshot 
    case Decrement(n) => 
     state = state.dec(n) 
     takeSnapshot 
    } 

    def takeSnapshot: Unit = { 
    // if (state % 5 == 0) saveSnapshot() 
    saveSnapshot() 
    } 
} 


object Counter { 

    sealed trait Operation { 
    val count: Int 
    } 

    case class Increment(override val count: Int) extends Operation 

    case class Decrement(override val count: Int) extends Operation 

    final case class State(n: Int) { 
    def inc(x: Int) = State(n + x) 

    def dec(x: Int) = State(n - x) 
    } 

} 







object Persistent extends App { 

    import Counter._ 

    val system = ActorSystem("persistent-actors") 

    val counter = system.actorOf(Props[Counter]) 

    counter ! Increment(3) 
    counter ! Increment(5) 
    counter ! Decrement(3) 
    counter ! "print" 

    Thread.sleep(1000) 

    system.terminate() 

} 

構成(application.conf):

akka { 
    persistence { 
    journal { 
     plugin = "akka.persistence.journal.leveldb", 
     leveldb { 
     dir = "target/example/journal", 
     native = false 
     } 
    }, 
    snapshot-store { 
     plugin = "akka.persistence.snapshot-store.local", 
     local { 
     dir = "target/example/snapshots" 
     } 
    } 
    } 
} 

二回のアプリを実行すると、状態は全くない永続的であることを示しています。

Recovery completed. 
Counter receive Increment(3) 
Counter receive Increment(5) 
Counter receive Decrement(3) 
The current state of couter is State(5) 
snapshot saved 
snapshot saved 
snapshot saved 

Recovery completed. 
Counter receive Increment(3) 
Counter receive Increment(5) 
Counter receive Decrement(3) 
The current state of couter is State(5) 
snapshot saved 
snapshot saved 
snapshot saved 

なぜですか?

+0

永続性プラグインを設定しましたか?どのジャーナルを使用していますか? – manub

+0

あなたが働いていないものに直面している問題は何ですか?これについてさらに情報を提供してください。 – curious

+0

@manub設定が公開されました。 – qed

答えて

3

ここでの問題は、アクタが受け取るすべての操作メッセージの後にスナップショットを作成することですが、スナップショットを取っている間はその状態を保存していないということです。あなたが密接にあなたtakeSnapshotコードを見れば:saveSnapshot()

def takeSnapshot: Unit = { 
    // if (state % 5 == 0) saveSnapshot() 
    saveSnapshot() 
    } 

コールに渡される引数がないので、あなたの状態のスナップショットを取ることはありません。

あなたは少し、このようにあなたにtakeSnapshot方法を変更する必要があります

def takeSnapshot: Unit = { 
    // if (state % 5 == 0) saveSnapshot() 
    saveSnapshot(state) // Pass the states you need to store while taking a snapshot. 
    } 

これは動作します。

関連する問題