2016-03-28 3 views
5

私はspark apacheの仕事が完了するまで待っているが、成功していないときに "while(true)"の解決策を避けようとしています。別のアプリケーションから起動する際にapache spark launcherのジョブを正しく待つ方法はありますか?

私はいくつかのデータを処理して結果をデータベースに格納するスパークアプリケーションを持っています。私はスプリングサービスから呼び出して、ジョブが完了するまで待つつもりです。

例:メソッドと

ランチャー:ハンドラの状態は "完成" されるまで適切に待機する

@Override 
public void run(UUID docId, String query) throws Exception { 
    launcher.addAppArgs(docId.toString(), query); 

    SparkAppHandle sparkAppHandle = launcher.startApplication(); 

    sparkAppHandle.addListener(new SparkAppHandle.Listener() { 
     @Override 
     public void stateChanged(SparkAppHandle handle) { 
      System.out.println(handle.getState() + " new state"); 
     } 

     @Override 
     public void infoChanged(SparkAppHandle handle) { 
      System.out.println(handle.getState() + " new state"); 
     } 
    }); 

    System.out.println(sparkAppHandle.getState().toString()); 
} 

+0

これを解決できましたか? – gaurav5430

答えて

2

また、SpringアプリケーションのSparkLauncherも使用しています。以下は、私が取ったアプローチの要約です(JavaDocの例を参照)。

ジョブを起動するために使用される@Serviceは、SparkHandle.Listenerも実装し、.startApplicationなどを介して自身への参照を渡します。このアプローチを使用

... 
... 
@Service 
public class JobLauncher implements SparkAppHandle.Listener { 
... 
... 
... 
private SparkAppHandle launchJob(String mainClass, String[] args) throws Exception { 

    String appResource = getAppResourceName(); 

    SparkAppHandle handle = new SparkLauncher() 
     .setAppResource(appResource).addAppArgs(args) 
     .setMainClass(mainClass) 
     .setMaster(sparkMaster) 
     .setDeployMode(sparkDeployMode) 
     .setSparkHome(sparkHome) 
     .setConf(SparkLauncher.DRIVER_MEMORY, "2g") 
     .startApplication(this); 

    LOG.info("Launched [" + mainClass + "] from [" + appResource + "] State [" + handle.getState() + "]"); 

    return handle; 
} 

/** 
* Callback method for changes to the Spark Job 
*/ 
@Override 
public void infoChanged(SparkAppHandle handle) { 

    LOG.info("Spark App Id [" + handle.getAppId() + "] Info Changed. State [" + handle.getState() + "]"); 

} 

/** 
* Callback method for changes to the Spark Job's state 
*/ 
@Override 
public void stateChanged(SparkAppHandle handle) { 

    LOG.info("Spark App Id [" + handle.getAppId() + "] State Changed. State [" + handle.getState() + "]"); 

} 

は、状態の変化が「FAILED」したときに1が行動を取ることができ、「完了」または「KILLED」。

この情報がお役に立てば幸いです。

+0

私も同じ問題に直面しています。私はOPを使って(新しい匿名のリスナーオブジェクトを作成する)、あなたが記述する方法を試してみました。どちらの場合も、リスナー・メソッドは呼び出されませんでした。 – Reddy

+0

@Reddy:これを手に入れることができましたか?私も同じ問題に直面しています。アプリケーションは即座に終了し、appIDと状態を取得します。明示的にThread.sleep()を呼び出すと動作します。ありがとうございました! – Tariq

1

CountDownLatchを使用して実装しましたが、期待どおりに動作します。

... 
final CountDownLatch countDownLatch = new CountDownLatch(1); 
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch); 
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener); 
Thread sparkAppListenerThread = new Thread(sparkAppListener); 
sparkAppListenerThread.start(); 
long timeout = 120; 
countDownLatch.await(timeout, TimeUnit.SECONDS);  
    ... 

private static class SparkAppListener implements SparkAppHandle.Listener, Runnable { 
    private static final Log log = LogFactory.getLog(SparkAppListener.class); 
    private final CountDownLatch countDownLatch; 
    public SparkAppListener(CountDownLatch countDownLatch) { 
     this.countDownLatch = countDownLatch; 
    } 
    @Override 
    public void stateChanged(SparkAppHandle handle) { 
     String sparkAppId = handle.getAppId(); 
     State appState = handle.getState(); 
     if (sparkAppId != null) { 
      log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - " 
        + SPARK_STATE_MSG.get(appState)); 
     } else { 
      log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState)); 
     } 
     if (appState != null && appState.isFinal()) { 
      countDownLatch.countDown(); 
     } 
    } 
    @Override 
    public void infoChanged(SparkAppHandle handle) {} 
    @Override 
    public void run() {} 
} 
関連する問題