2017-02-09 27 views
4

コードからflinkジョブを停止/キャンセルしたい状況に陥っています。これは、私のフリンクジョブにタスクを提出し、結果を確認する統合テストです。ジョブが非同期で実行されるため、テストが失敗してもパスしなくても停止しません。テストが終わった後、私は仕事をやりたいコードからApache Flinkジョブを取り消す

私は、以下のリストしていますいくつか試してみました:jobmanagerへ をキャンセルするリクエストを送信、

  1. がjobmanager俳優
  2. は、実行中の各ジョブの場合、ジョブ
  3. を実行ゲット

    これはもちろん実行されていませんが、jobmanagerのactorrefが間違っているのか、それとも他のものがないのか分かりません。

    エラーは次のとおりです。[flink-akka.actor.default-dispatcher-5] [akka:// flink/user/jobmanager_1]メッセージ[org.apache.flink.runtime.messages.JobManagerMessages $ RequestRunningJobsStatus $ ]をアクタ[akka:// flink/temp/$ a]からアクタ[akka:// flink/user/jobmanager_1]に転送できませんでした。 [1]デッドレターが発生しました。このロギングは、構成設定「akka.log-dead-letters」と「akka.log-dead-letters-during-shutdown」を使用してオフまたは調整できます。

    これは、ジョブマネージャのアクターrefが間違っているか、送信されたメッセージが正しくありません。

    コードは、次のようになります。これは正しいアプローチであれば

    val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path 
    val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url 
    val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS)) 
        try { 
         val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS)) 
         if(result.isInstanceOf[RunningJobsStatus]){ 
         val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages() 
         val itr = runningJobs.iterator() 
         while(itr.hasNext){ 
          val jobId = itr.next().getJobId 
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS))); 
          try { 
          Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS)) 
          } 
          catch { 
          case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e 
          } 
    
         } 
         } 
        } 
        catch{ 
         case e : Exception => "Could not retrieve running jobs from the JobManager." + e 
        } 
    
        } 
    

    誰かがチェックすることはできますか?

    EDIT: ジョブを完全に停止するには、まずTaskManagerをJobManagerの順序でTaskManagerとJobManagerの順に停止する必要があります。

答えて

2

新しいActorSystemを作成してから、同じアクターシステムで/user/jobmanager_1という名前の俳優を見つけようとしています。実際のジョブマネージャは異なるActorSystemで実行されるため、これは機能しません。

あなたが本当のジョブマネージャにActorRefを取得したい場合は、あなたが選択のために同じActorSystemを使用する必要がどちらか(そして、あなたは、ローカルアドレスを使用することができます)か、ジョブマネージャの俳優のためのリモートアドレスを見つける必要があり。リモートアドレスの形式はakka.tcp://[email protected][address_of_actor_system]/user/jobmanager_[instance_number]です。 FlinkMiniClusterへのアクセス権がある場合は、leaderGateway約束を使用して、現在のリーダーのActorGatewayを取得することができます。

+0

こんにちは、あなたが指摘したことは正しいものでした。ジョブマネージャのリモートアドレスを取得する方法はありますか? FlinkMiniClusterは、環境によって直接作成されるため、アクセスできません。私は私の統合テストの一部であった仕事をやめたいと思っていました。今は私がFlinkMiniClusterを作成するコードを抽象化して、私がそれを制御して停止できるようにしました。 – Tej

+1

現時点では、Flink APIはそのような機能を提供していません。しかし、私たちは実行中のジョブをより明示的に制御できる 'JobClient'を導入することでその改善に取り組んでいます。これまでのところ、自分のFlinkMiniClusterを作成し、それに統合テストの仕事を提出することが、あなたの記述を達成する最良の方法です。 –

+0

おかげでたくさん! – Tej

関連する問題