2012-01-11 10 views
2

RSSフィード項目を取得し、処理してデータベースに保存する少数の俳優を産んでいます。これは、cronで実行されているオブジェクトの主な方法によって行われます。私はこれらの俳優を作成し、彼らに割り当てられた前の仕事を完了すると、彼らに仕事を出します。私のメインクラスは、ひとりの俳優、つまり俳優のプールに仕事を奪い取る俳優を生み出します。結局のところ、メインの方法はハングアップしているようです。終了しませんが、すべてのアクタで実行が停止します。私のCTOは、俳優が仕事を終えて去る前にメインが終了していると信じていますが、そうは確信していません。私はメインで成功出口を受け取っていない(全く出口がない)。スカラの俳優がぶら下がった

私はこれらのアクターをどのようにデバッグするのか、と考えられています。アクターが実行を完了する前にメインの出口が出るのですか?そうであれば、それは関係ありますか?私が受け取ったアクターがスレッドに1対1でマッピングされていることを教えてください。コードは以下のとおりです。フォローアップの質問をしてください、助けていただければ幸いです。私は十分な詳細を提供していない可能性があることを知っています、私はスカラーと俳優には新しく、必要に応じて更新します。あなたはすべてのスピナーの俳優が「終了」したか否かを判断するために、左を折るしているときに、小さなしかし重要な間違いを作っている一つのことについては

object ActorTester { 
    val poolSize = 10 
    var pendingQueue :Set[RssFeed] = RssFeed.pendingQueue 

    def main(args :Array[String]) { 
    val manager = new SpinnerManager(poolSize, pendingQueue) 
    manager.start 
    } 
} 

case object Stop 

class SpinnerManager(poolSize :Int = 1, var pendingQueue :Set[RssFeed]) extends Actor { 
    val pool = new Array[Spinner](poolSize) 

    override def start() :Actor = { 
    for (i <- 0 to (poolSize - 1)) { 
     val spinner = new Spinner(i) 
     spinner.start() 
     pool(i) = spinner 
    } 
    super.start 
    } 

    def act() { 
    for { 
     s <- pool 
     if (!pendingQueue.isEmpty) 
    } { 
     s ! pendingQueue.head 
     pendingQueue = pendingQueue.tail 
    } 

    while(true) { 
     receive { 
     case id :Int => { 
      if (!pendingQueue.isEmpty) { 
      pool(id) ! pendingQueue.head 
      pendingQueue = pendingQueue.tail    
      } else if ((true /: pool) { (done, s) => { 
      if (s.getState != Actor.State.Runnable) { 
       val exited = future { 
       s ! Stop 
       done && true 
       } 
       exited() 
      } else { 
       done && false 
      } 
      }}) { 
      exit 
      } 
     } 
     } 
    } 
    } 
} 

class Spinner(id :Int) extends Actor { 
    def act() { 
    while(true) { 
     receive { 
     case dbFeed :RssFeed => { 
      //process rss feed 
      //this has multiple network requests, to the original blogs, bing image api 
      //our instance of solr - some of these spawn their own actors 
      sender ! id 
     } 
     case Stop => exit 
     } 
    } 
    } 
} 

答えて

0

このように数日間のデバッグを行った後、私はこの問題を解決しました。 fotNeltonのコード提案はとても役に立ちましたので、私は彼に投票を与えました。しかし、彼らは問題自体に取り組まなかった。私が見いだしたことは、メインメソッドでこれを実行している場合、親アクターが子アクターの前に出ると、プログラムは永遠にハングして終了せず、すべてのメモリーを保持していることです。 RSSフィードを処理する過程で、Fetcherはアクターを生成し、ネットワーク要求を含む処理を行うメッセージを送信します。これらの俳優は、親の俳優が辞める前に作業を完了する必要があります。フェッチャーは、これらの俳優が終了するのを待つことはありません。一度彼がメッセージを送ったら、彼はただ移動します。だから彼は子供の俳優たちがすべての仕事を終える前に彼が終わったマネージャーに話すだろう。これに対処するには、先物を使用して、アクターが完了するまで待つ(かなり遅くする)ことです。私のソリューションは、URLを使ってアクセス可能なサービス(反応するのを待っているアクタを持つサービスへのPOST)を作成することでした。サービスはすぐに応答し、自分の俳優にメッセージを送ります。したがって、アクタは、サービスにリクエストを送信すると終了し、他のアクタを起動する必要はありません。

object FeedFetcher { 
    val poolSize = 10 
    var pendingQueue :Queue[RssFeed] = RssFeed.pendingQueue 

    def main(args :Array[String]) { 
    new FetcherManager(poolSize, pendingQueue).start 
    } 
} 

case object Stop 

class FetcherManager(poolSize :Int = 1, var pendingQueue :Queue[RssFeed]) extends Actor { 
    val pool = new Array[Fetcher](poolSize) 
    var numberProcessed = 0 

    override def start() :Actor = { 
    for (i <- 0 to (poolSize - 1)) { 
     val fetcher = new Fetcher(i) 
     fetcher.start() 
     pool(i) = fetcher 
    } 
    super.start 
    } 

    def act() { 
    for { 
     f <- pool 
     if (!pendingQueue.isEmpty) 
    } { 
     pendingQueue.synchronized { 
     f ! pendingQueue.dequeue 
     } 
    } 

    loop { 
     reactWithin(10000L) { 
     case id :Int => pendingQueue.synchronized { 
      numberProcessed = numberProcessed + 1 
      if (!pendingQueue.isEmpty) { 
      pool(id) ! pendingQueue.dequeue    
      } else if ((true /: pool) { (done, f) => { 
      if (f.getState == Actor.State.Suspended) { 
       f ! Stop 
       done && true 
      } else if (f.getState == Actor.State.Terminated) { 
       done && true 
      } else { 
       false 
      } 
      }}) { 
      pool foreach { f => { 
       println(f.getState) 
      }} 
      println("Processed " + numberProcessed + " feeds total.") 
      exit 
      } 
     } 
     case TIMEOUT => { 
      if (pendingQueue.isEmpty) { 
      println("Manager just woke up from timeout with all feeds assigned.") 
      pool foreach { f => { 
       if (f.getState == Actor.State.Suspended) { 
       println("Sending Stop to Fetcher " + f.id) 
       f ! Stop 
       } 
      }} 
      println("Checking state of all Fetchers for termination.") 
      if ((true /: pool) { (done, f) => { 
       done && (f.getState == Actor.State.Terminated) 
      }}) { 
       exit 
      } 
      } 
     } 
     } 
    } 
    } 
} 

class Fetcher(val id :Int) extends Actor { 
    var feedsIveDone = 0 
    def act() { 
    loop { 
     react { 
     case dbFeed :RssFeed => { 
      println("Fetcher " + id + " starting feed") 
      //process rss feed here 
      feedsIveDone = feedsIveDone + 1 
      sender ! id 
     } 
     case Stop => { 
      println(id + " exiting") 
      println(feedsIveDone) 
      exit 
     } 
     } 
    } 
    } 
2

。あなたがしなければならないのは、done && true respです。 if caseの最後にdone && falseがありますが、現在のところtrueと言っています。 falsedoneに関係なく。

たとえば、最初と2番目がRunnableで、3番目のファイルがでない場合はの4つのSpinnerアクターがあり、もう1つはRunnableです。その場合、第3の俳優がまだ完成していないにもかかわらず、あなたの折り畳みの結果はtrueになります。論理&&を使用していた場合、正しい結果が得られます。

これはアプリケーションがハングする原因となる可能性もあります。

EDIT:競合状態に問題がありました。次のコードは今動作します。役立つことを願っています。とにかく、私はScalaの俳優の実装が自動的にワーカースレッドを利用するのではないかと疑問に思っていましたか?

import actors.Actor 
import scala.collection.mutable.Queue 

case class RssFeed() 

case class Stop() 

class Spinner(id: Int) extends Actor { 
    def act() { 
    loop { 
     react { 
     case dbFeed: RssFeed => { 
      // Process RSS feed 
      sender ! id 
     } 
     case Stop => exit() 
     } 
    } 
    } 
} 

class SpinnerManager(poolSize: Int, pendingQueue: Queue[RssFeed]) extends Actor { 
    val pool = Array.tabulate(poolSize)(new Spinner(_).start()) 

    def act() { 
    for (s <- pool; if (!pendingQueue.isEmpty)) { 
     pendingQueue.synchronized { 
     s ! pendingQueue.dequeue() 
     } 
    } 

    loop { 
     react { 
     case id: Int => pendingQueue.synchronized { 
      if (!pendingQueue.isEmpty) { 
      Console println id 
      pool(id) ! pendingQueue.dequeue() 
      } else { 
      if (pool forall (_.getState != Actor.State.Runnable)) { 
       pool foreach (_ ! Stop) 
       exit() 
      } 
      } 
     } 
     } 
    } 
    } 

} 

object ActorTester { 
    def main(args: Array[String]) { 
    val poolSize = 10 
    val pendingQueue: Queue[RssFeed] = Queue.tabulate(100)(_ => RssFeed()) 
    new SpinnerManager(poolSize, pendingQueue).start() 
    } 
} 
+1

もちろん更新されました。もう一度テストします。ありがとう。 – Kareem

+0

まだぶら下がっています。しかし、ありがとう。 – Kareem

+0

編集を確認してください。 – fotNelton

関連する問題