2016-04-19 4 views
4

大量のサーバーデータを解析して、Redshiftデータベースにアップロードするジョブを作成しています。boto3(またはその他の方法)を使用してemrでpysparkジョブを自動化するにはどうすればよいですか?

次のように私の仕事の流れは次のとおりです。

  • グラブS3
  • ユーススパークdataframesまたはスパークSQLのいずれかからのログデータは、データを解析し、S​​3
  • に出書き戻す
  • からデータをアップロードしますS3からレッドシフトへ。

私のプロセスがEMRクラスタをスピンアップし、インストール用の正しいプログラムをブートストラップし、解析と書き込みのコードを含むPythonスクリプトを実行するように、これを自動化する方法が不安になっています。

誰でも、どのようにこれを行う方法を学ぶのを助けるために私と共有できる経験、チュートリアル、経験がありますか?

答えて

9

boto3 EMR docsを見てクラスタを作成してください。基本的にrun_job_flowに電話して、必要なプログラムを実行するステップを作成する必要があります。

import boto3  

client = boto3.client('emr', region_name='us-east-1') 

S3_BUCKET = 'MyS3Bucket' 
S3_KEY = 'spark/main.py' 
S3_URI = 's3://{bucket}/{key}'.format(bucket=S3_BUCKET, key=S3_KEY) 

# upload file to an S3 bucket 
s3 = boto3.resource('s3') 
s3.meta.client.upload_file("myfile.py", S3_BUCKET, S3_KEY) 

response = client.run_job_flow(
    Name="My Spark Cluster", 
    ReleaseLabel='emr-4.6.0', 
    Instances={ 
     'MasterInstanceType': 'm4.xlarge', 
     'SlaveInstanceType': 'm4.xlarge', 
     'InstanceCount': 4, 
     'KeepJobFlowAliveWhenNoSteps': True, 
     'TerminationProtected': False, 
    }, 
    Applications=[ 
     { 
      'Name': 'Spark' 
     } 
    ], 
    BootstrapActions=[ 
     { 
      'Name': 'Maximize Spark Default Config', 
      'ScriptBootstrapAction': { 
       'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config', 
      } 
     }, 
    ], 
    Steps=[ 
    { 
     'Name': 'Setup Debugging', 
     'ActionOnFailure': 'TERMINATE_CLUSTER', 
     'HadoopJarStep': { 
      'Jar': 'command-runner.jar', 
      'Args': ['state-pusher-script'] 
     } 
    }, 
    { 
     'Name': 'setup - copy files', 
     'ActionOnFailure': 'CANCEL_AND_WAIT', 
     'HadoopJarStep': { 
      'Jar': 'command-runner.jar', 
      'Args': ['aws', 's3', 'cp', S3_URI, '/home/hadoop/'] 
     } 
    }, 
    { 
     'Name': 'Run Spark', 
     'ActionOnFailure': 'CANCEL_AND_WAIT', 
     'HadoopJarStep': { 
      'Jar': 'command-runner.jar', 
      'Args': ['spark-submit', '/home/hadoop/main.py'] 
     } 
    } 
    ], 
    VisibleToAllUsers=True, 
    JobFlowRole='EMR_EC2_DefaultRole', 
    ServiceRole='EMR_DefaultRole' 
) 

あなたはジョブフローIDを知っている場合にも、実行中のクラスタにステップを追加することができますより多くの構成の場合

job_flow_id = response['JobFlowId'] 
print("Job flow ID:", job_flow_id) 

step_response = client.add_job_flow_steps(JobFlowId=job_flow_id, Steps=SomeMoreSteps) 

step_ids = step_response['StepIds'] 

print("Step IDs:", step_ids) 

を、sparkstepsをチェックしてください。

+0

私は、pythonファイルであるS3_KEY値の価値と意義を理解することができません。/は何ですか? –

+0

S3キーは、実行するPySparkファイル/ジョブです。手順の1つでは、それをS3からクラスタにコピーします。それはPythonファイルである必要はありません。 Scalaジョブを実行している場合はScalaになります。 – ksindi

関連する問題