2017-11-21 5 views
0

私はaskモードを使用して、fun-Fという関数でactor-Aというアクターにリクエストを送信します。アクタは、別のシステムによって生成されたIDを非同期的に取得します。これが完了すると、このIDを含むメッセージをactor-Bという別のアクタに転送し、actor-BはDB操作を行い、私の場合は転送モードを使用するので、アクターBは送信者をfun-Fとして認識し、akkaはfun-Fに一時的なアクター名を与えるので、戻り値は臨時俳優に届けられます。アクターに非同期メソッドが含まれている場合、Askタイムアウト例外のデッドレターエラーが発生します

私の質問は次のとおりです。

私は別のシステムからIDを取得するために、同期メソッドを使用する場合は、俳優-BのDB操作の後、結果はに配信することができ、俳優-Bにこのメッセージを転送fun-Fの値であり、fun-Fは、akkaフレームワークランタイムによって一時的なアクタActor [akka:// ai-feedback-service/temp/$ b]として定義されています。

別のシステムからIDを取得するためにasync-methodを使用すると、完了したら、別のコールバックスレッドの未完了の{}コードブロックにメッセージを転送し、アクターBのDB操作は正常に処理されます戻り値はfun-Fで定義された値に渡すことはできません。この場合、fun-FはakkaフレームワークランタイムによってActor [akka:// ai-feedback-service/deadLetters]として定義されます。したがって、アクターBはその方法を失い、戻ってくる方法やこのメッセージをどこに配信すべきかを知らないので、Ask Time Out例外がログにスローされます。

どうすればこの問題を処理できますか?またはどのように私はこの死んだ手紙を避けることができますタイムアウト例外を尋ねる?

// this is the so-called fun-F [createFeedback] 
def createFeedback(query: String, 
        response: String, 
        userId: Long, 
        userAgent: String, 
        requestId: Long, 
        errType: Short, 
        memo: String): Future[java.lang.Long] = { 
    val ticket = Ticket(userId, 
         requestId, 
         query, 
         response, 
         errType, 
         userAgent, 
         memo) 
    val issueId = (jiraActor ? CreateJiraTicketSignal(ticket)) 
        .mapTo[CreateFeedbackResponseSignal].map{ r => 
     r.issueId.asInstanceOf[java.lang.Long] 
    } 
    issueId 
} 


//this is the so-called actor-A [jiraActor] 
//receive method are run in its parent actor for some authorization 
//in this actor only override the handleActorMsg method to deal msg 
override def handleActorMsg(msg: ActorMsgSignal): Unit = { 
    msg match { 
     case s:CreateJiraTicketSignal => 
      val issueId = createIssue(cookieCache.cookieContext.flag, 
            cookieCache.cookieContext.cookie, 
            s.ticket) 
      println(s">> ${sender()} before map $issueId") 
      issueId.map{ 
       case(id:Long) => 
        println(s">> again++issueId = $id ${id.getClass}") 
        println(s">>> $self/${sender()}") 
        println("again ++ jira action finished") 
        dbActor.forward(CreateFeedbackSignal(id,s.ticket)) 
       case(message:String) if(!s.retry) => 
        self ! CreateJiraTicketSignal(s.ticket,true) 
       case(message:String) if(s.retry) => 
        log.error("cannot create ticket :" + message) 
      } 
      println(s">> after map $issueId") 
} 


//this is the so-called actor-B [dbActor] 
override def receive: Receive = { 
    case CreateFeedbackSignal(issueId:Long, ticket:Ticket) => 
     val timestampTicks = System.currentTimeMillis() 
     val description: String = Json.obj("question" -> ticket.query, 
              "answer" -> ticket.response) 
              .toString() 
     dao.createFeedback(issueId, 
          ticket.usrId.toString, 
          description, 
          FeedbackStatus.Open.getValue 
           .asInstanceOf[Byte], 
          new Timestamp(timestampTicks), 
          new Timestamp(timestampTicks), 
          ticket.usrAgent, 
          ticket.errType, 
          ticket.memo) 

     println(s">> sender = ${sender()}") 
     sender() ! (CreateFeedbackResponseSignal(issueId)) 
     println("db issue id is " + issueId) 
     println("db action finished") 
} 

答えて

0

が死んで手紙の問題を回避するには、次の操作を行います:識別子(おそらくrequestId)を使用し、すべてのリクエストについては

  1. をあなたが関連付けることができます以下

    は私のコードです要請の最終的な目標と結びついています。つまり、createFeedbackメソッドに渡しているrequestIdを、そのメソッドの呼び出し元(ActorRef)に結び付け、メッセージチェーンにこのIDを渡します。これらの関連付けを保持するためにマップを使用することができます。

    • 変更CreateFeedbackResponseSignal(issueId)TicketクラスからrequestIdを含める:CreateFeedbackResponseSignal(requestId, issueId)
  2. アクター内部からFutureの非同期結果を扱う

  3. pipe Futureコールバックを使用する代わり selfの結果。このアプローチで

    • 結果が利用可能な場合、createIssueの結果はjiraActorに送信されます。 jiraActorは、その結果をdbActorに送信します。
    • jiraActorsenderで、dbActorになります。 jiraActordbActorの結果を受け取ると、jiraActorはその内部マップのターゲットへの参照をルックアップできます。以下は

あなたのユースケースを模倣し、ScalaFiddleで実行可能である簡単な例です:

次の出力でScalaFiddle結果で上記のコードを実行する
import akka.actor._ 
import akka.pattern.{ask, pipe} 
import akka.util.Timeout 

import language.postfixOps 

import scala.concurrent._ 
import scala.concurrent.duration._ 

case class Signal(requestId: Long) 
case class ResponseSignal(requestId: Long, issueId: Long) 

object ActorA { 
    def props(actorB: ActorRef) = Props(new ActorA(actorB)) 
} 

class ActorA(dbActor: ActorRef) extends Actor { 
    import context.dispatcher 

    var targets: Map[Long, ActorRef] = Map.empty 

    def receive = { 
    case Signal(requestId) => 
     val s = sender 
     targets = targets + (requestId -> s) 
     createIssue(requestId).mapTo[Tuple2[Long, Long]].pipeTo(self) // <-- use pipeTo 
    case ids: Tuple2[Long, Long] => 
     println(s"Sending $ids to dbActor") 
     dbActor ! ids 
    case r: ResponseSignal => 
     println(s"Received from dbActor: $r") 
     val target = targets.get(r.requestId) 
     println(s"In actorA, sending to: $target") 
     target.foreach(_ ! r) 
     targets = targets - r.requestId 
    } 
} 

class DbActor extends Actor { 
    def receive = { 
    case (requestId: Long, issueId: Long) => 
     val response = ResponseSignal(requestId, issueId) 
     println(s"In dbActor, sending $response to $sender") 
     sender ! response 
    } 
} 

val system = ActorSystem("jiratest") 
implicit val ec = system.dispatcher 

val dbActor = system.actorOf(Props[DbActor]) 
val jiraActor = system.actorOf(Props(new ActorA(dbActor))) 

val requestId = 2L 

def createIssue(requestId: Long): Future[(Long, Long)] = { 
    println(s"Creating an issue ID for requestId[$requestId]") 
    Future((requestId, 99L)) 
} 

def createFeedback(): Future[Long] = { 
    implicit val timeout = Timeout(5.seconds) 
    val res = (jiraActor ? Signal(requestId)).mapTo[ResponseSignal] 
    res.map(_.issueId) 
} 

createFeedback().onComplete { x => 
    println(s"Done: $x") 
} 

Creating an issue ID for requestId[2] 
Sending (2,99) to dbActor 
In dbActor, sending ResponseSignal(2,99) to Actor[akka://jiratest/user/$b#-710097339] 
Received from dbActor: ResponseSignal(2,99) 
In actorA, sending to: Some(Actor[akka://jiratest/temp/$a]) 
Done: Success(99) 
+0

こんにちはChunjef、ありがとう、本当にあなたの親切な助けに感謝、それは私のために働く。ちなみに、私は別の質問があります。このようにしてターゲットActorRefをマップに格納する必要があります。akkaには、それらを格納する必要がない他のメカニズムがあり、より直接的に処理しますか? – zj43

関連する問題