2016-04-26 18 views
0

MRJobは、各ジョブが完了するまで待機してから、ユーザーに制御を戻します。大きなEMRのステップを小さなものに分割し、それらをすべてワンショットで提出したいと思います。MRJobを使用してEMRクラスタにジョブを送信

ドキュメントはprogrammatically submitting tasksについて話しますが、サンプルコードでは、ジョブの完了を待っています(runner.run()コマンドはblocks until the job is completeです)。

また、EMRには256のアクティブジョブの制限がありますが、ループしたり、接続されたコンソールで出力を取得するのではなく、256個のジョブをどのように満たしていますか?

答えて

0

試してみると、次のようなことが出てきます。

私の最初の試行では、端末が切り離されたときにサブミットされたジョブが(bashスクリプトで)ジョブをサブミットして終了させるということに気づいたとき。しかし、AWSがEMRへの呼び出しを抑制し、したがってジョブのいくつかが提出される前に殺されたため、これはうまく機能しませんでした。

現在のベストソリューション

from jobs import MyMRJob 
import logging 

logging.basicConfig(
    level=logging.INFO, 
    format = '%(asctime)-15s %(levelname)-8s %(message)s', 
) 
log = logging.getLogger('submitjobs') 

def main(): 
    cluster_id="x-MXMXMX" 
    log.info('Cluster: %s', cluster_id) 
    for i in range(10): 
     n = '%04d' % i 
     log.info('Adding job: %s', n) 
     mr_job = MyMRJob(args=[ 
      '-r', 'emr', 
      '--conf-path', 'mrjob.conf', 
      '--no-output', 
      '--output-dir', 's3://mybucket/mrjob/%s' % n, 
      '--cluster-id', cluster_id, 
      'input/file.%s' % n 
    ]) 
    runner = mr_job.make_runner() 
    # the following is the secret sauce, submits the job and returns 
    # it is a private method though, so may be changed without notice 
    runner._launch() 

if __name__ == '__main__': 
    main() 
関連する問題