2016-12-14 1 views
2

私はscala + Akkaで私の手を試しています。フォールトトレランスを理解しようとしています。スーパーバイザからメッセージを受け取り、DBにデータを挿入する俳優がいます。スーパーバイザは、障害が発生したときにアクタを再起動します。Akkaフォールトトレランスのアプローチ

DBへの接続に問題がある場合に備えて、postRestart()の接続文字列を変更しています。現在、あるDBとの接続に問題が発生すると、アクタが再起動し、別のDBへのデータの挿入が開始されます。

これは十分なアプローチですか?推奨されるアプローチは何ですか?

スーパーバイザー:

class SocialSupervisor extends Actor { 

    override val supervisorStrategy=OneForOneStrategy(loggingEnabled = false){ 
    case (e:Exception)=>Restart 
    } 

    val post_ref=context.actorOf(Props[Post]) 
    def receive={ 
      case Get_Feed(feed)=>{ 
       //get data from feed 
       post_ref!Post_Message(posted_by,post) 
      } 
    } 
} 

俳優:

class Post extends Actor{ 
    val config1=ConfigFactory.load() 
    var config=config1.getConfig("MyApp.db") 

    override def postRestart(reason: Throwable) { 
     config=config1.getConfig("MyApp.backup_db") 
     super.postRestart(reason) 
    } 

    def insert_data(commented_by:String,comment:String){ 
      val connection_string=config.getString("url") 
       val username=config.getString("username") 
       val password=config.getString("password") 
       //DB operations 
    } 

    def receive={ 
     case Post_Message(posted_by,message)=>{ 
     insert_data(posted_by, message) 
     } 
    } 
} 

答えて

1

私はあなたはそれがより多くの "フォールトトレラント" にするためにあなたのコードを作ることができ、いくつかの改善点があると思います。

モジュール方式

それは&がどのActorSystemの独立した試験を使用することができるようにあなたは、おそらく俳優の残りの部分からあなたのinsert_data機能を分離する必要があります。あなたの俳優は彼らに非常にわずかなコードを持っていなければならないとreceive方法は、基本的に外部関数へのディスパッチャする必要があります:

object Post { 
    def insert_data(conn : Connection)(commented_by : String, comment : String) = { 
    ... 
    } 
} 

あなたも、さらに一歩進み、Connection依存関係を削除することができます。あなたの俳優の視点から挿入はPostMessageを取り込んで、有効な行更新の数を返す関数以外の何ものでもありません:

object Post { 
    //returns an Int because Statement.executeUpdate returns an Int 
    type DBInserter : Post_Message => Int 

あなたは今、以前のように、データベース接続に挿入することができます

def insertIntoLiveDB(connFactory :() => Connection) : DBInserter = 
    (postMessage : Post_Message) => { 
     val sqlStr = s"INSERT INTO .." 
     connFactory().createStatement() executeUpdate sqlStr 
    } 
    } 

またはテスト目的のために挿入を行い決して機能記述:

:今、あなたの俳優がほとんどないロジックを持ってい

//does no inserting 
    val neverInsert : DBInserter = (postMessage : Post_Message) => 0 
} 

を断然

class Post(inserter : Post.DBInserter) extends Actor { 

    def receive = { 
    case pm : Post_Message => inserter(pm) 
    } 

} 

フォールトトレランス

は、アプリケーション内の「障害」の最大の供給源は、データベースへConnectionであなたのケースで明らかに、ネットワークです。接続が失敗した場合に自動的にリフレッシュするための方法が必要です。私たちは、そうするために、工場の機能を使用することができます。問題がある場合

def basicConnFactory(timeoutInSecs : Int = 10) = { 

    //setup initial connection, not specified in question 
    var conn : Connection = ??? 

() => { 
    if(conn isValid timeoutInSecs) 
     conn 
    else { 
     conn = ??? //setup backup connection 
     conn 
    } 
    } 
} 

は今の接続の有効性は、各挿入時にテストされ、再確立されています。この工場では、俳優を作成するために使用することができます。

import Post.queryLiveDB 
val post_ref = 
    context actorOf (Props[Post], insertIntoLiveDB(basicConnFactory())) 

本番要件が厳しくなるにつれ、あなたはconnection poolを利用する工場をammendすることができます...

関連する問題