2017-12-24 15 views
5

エアフローでは、job_flow_idをemr-stepsの1つに渡す必要があるという問題に直面しています。私はオペレータからjob_flow_idを取得することができますが、クラスタに送信するステップを作成するときには、task_instanceの値が正しくありません。エアフロー - EMRオペレータのタスクインスタンス

def issue_step(name, args): 
    return [ 
     { 
      "Name": name, 
      "ActionOnFailure": "CONTINUE", 
      "HadoopJarStep": { 
       "Jar": "s3://....", 
       "Args": args 
      } 
     } 
    ] 

dag = DAG('example', 
      description='My dag', 
      schedule_interval='0 8 * * 6', 
      dagrun_timeout=timedelta(days=2)) 

try: 

    create_emr = EmrCreateJobFlowOperator(
     task_id='create_job_flow', 
     aws_conn_id='aws_default',   
     dag=dag 
    ) 

    load_data_steps = issue_step('load', ['arg1', 'arg2']) 

    load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id') 
    load_data_steps[0]["HadoopJarStep"]["Args"].append(
     "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id 

    load_data = EmrAddStepsOperator(
     task_id='load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others 
     aws_conn_id='aws_default', 
     steps=load_data_steps, 
     dag=dag 
    ) 

    check_load_data = EmrStepSensor(
     task_id='watch_load_data', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    cluster_remover = EmrTerminateJobFlowOperator(
     task_id='remove_cluster', 
     job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", 
     aws_conn_id='aws_default', 
     dag=dag 
    ) 

    create_emr_recommendations >> load_data 
    load_data >> check_load_data 
    check_load_data >> cluster_remover 

except AirflowException as ae: 
    print ae.message 

問題は、私が代わりにload_dataステップで--cluster-id j-1234を見ての、EMRをチェックするとき、私は--cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}"を参照してください、ということです、私のステップが失敗する原因: は、私は、次のコードを持っています。

ステップ関数内で実際の値を取得するにはどうすればよいですか?

感謝と幸せな休日

+0

引用符を付けずに値を追加しようとしましたか? –

+0

ここで、 '' 'task_instance'''を取得しますか?load_data_steps [0] [" HadoopJarStep "] [" Args "]からのオブジェクト?私はまだそれを使用する方法を学んでいます。 – davideberdin

答えて

3

は、私は、気流リポジトリについてthisのPRがあることが分かりました。問題は、EmrAddStepsOperatorの手順のテンプレートがないことです。この問題を克服するために、私は次のようでした:

私のDAGで、新たにオペレータがここ

をというファイル

  • プラグイン
  • として EmrAddStepsOperator
  • を追加しましたから、この演算子を継承するカスタムオペレータを作成し
    • カスタム演算子のコードとファイル内のプラグインcustom_emr_add_step_operator.py(下記のツリーを参照)

      from __future__ import division, absolute_import, print_function 
      
      from airflow.plugins_manager import AirflowPlugin 
      from airflow.utils import apply_defaults 
      
      from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator 
      
      
      class CustomEmrAddStepsOperator(EmrAddStepsOperator): 
          template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above 
      
          @apply_defaults 
          def __init__(
            self, 
            *args, **kwargs): 
           super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs) 
      
          def execute(self, context): 
           super(CustomEmrAddStepsOperator, self).execute(context=context) 
      
      
      # Defining the plugin class 
      class CustomPlugin(AirflowPlugin): 
          name = "custom_plugin" 
          operators = [CustomEmrAddStepsOperator] 
      
      このように私のDAGファイルで

      私は、プラグインと呼ばれる

      from airflow.operators import CustomEmrAddStepsOperator 
      

      私のプロジェクトやプラグインの構造は次のようになります。あなたは、このようなPyCharm、この意志としてIDEを使用している場合

      ├── config 
      │   └── airflow.cfg 
      ├── dags 
      │   ├── __init__.py 
      │   └── my_dag.py 
      ├── plugins 
      │   ├── __init__.py 
      │   └── operators 
      │    ├── __init__.py 
      │    └── custom_emr_add_step_operator.py 
      └── requirements.txt 
      

      それはモジュールを見つけることができないと言うので、不平を言う。しかし、Airflowを実行すると、この問題は発生しません。 airflow.cfgの場合は、pluginsフォルダをポイントして、Airflowが新しく作成したプラグインを読み込めるようにしてください。