2016-07-25 7 views
6

気流の従属タスクにパラメータを渡す方法は何ですか?私は多くのbashesファイルを持っており、私はこの手法をエアフローに移行しようとしていますが、タスク間でいくつかのプロパティを渡す方法はわかりません。気流パラメータを従属タスクに渡す

これは実際の例です:私はT1で作成したディレクトリ名にアクセスする必要がT2には

#sqoop bash template 
sqoop_template = """ 
     sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/ 
    """ 

s3_template = """ 
     s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}} 
    """ 



#Task of extraction in EMR 
t1 = BashOperator(
     task_id='extract_account', 
     bash_command=sqoop_template, 
     params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")}, 
     dag=dag) 
#Task to upload in s3 backup. 
t2 = BashOperator(
     task_id='s3_upload', 
     bash_command=s3_template, 
     params={}, #here i need the dir name created in t1 
     depends_on_past=True 
    ) 

t2.set_upstream(t1) 

改善は歓迎されているので、ない決定的な解決策であるソリューション

#Execute a valid job sqoop 
def sqoop_import(table_name, job_name): 
    s3, hdfs = dirpath(table_name) 
    sqoop_job = job_default_config(job_name, hdfs) 
    #call(sqoop_job) 
    return {'hdfs_dir': hdfs, 's3_dir': s3} 

def s3_upload(**context): 
    hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir'] 
    s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir'] 
    s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)] 
    #call(s3_cpdist_job) 
    return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import') 

def sns_notify(**context): 
    s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir'] 
    client = boto3.client('sns') 
    arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg' 
    response = client.publish(TargetArn=arn, Message=s3) 
    return response 

。ありがとう。

+0

私の意見では、1つの解決策は、t1で作成されたプロパティでファイルを作成し、t2で同じファイルを使用することです。 –

答えて

8

XComs - http://airflow.incubator.apache.org/concepts.html#xcomsをご覧ください。これらは、タスク間で状態を通信するために使用されます。

+0

私はこのアプローチを使用して解決しますが、ここで解決方法を完全に忘れてしまいます。ありがとう。 –

関連する問題