2017-02-17 12 views
3

エアフローの使用を開始して以来、私と一緒にいてください。私がやろうとしていることは、BashOperatorタスクからリターンコードを収集し、それをローカル変数に保存してからその戻りコードは別のタスクに分岐します。私が持っている問題は、BashOperatorに何かを返す方法を考え出すことです。以下は、私のコードセグメントである:エアフローBashOperator回収コード

dag = DAG(dag_id='dag_1', 
     default_args=default_args, 
     schedule_interval='0 2 * * *', 
     user_defined_macros=user_def_macros, 
     dagrun_timeout=timedelta(minutes=60) 
    ) 
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag) 
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag) 
t2.set_upstream(oodas) 

私はxcom_pushをしようとしているが、正直、それがどのように動作するか分からないよ。これは結果を収集するための正しい方法ですか?ログでは、最後の行は戻りコード0で終了しました。

答えて

0

DAG全体を投稿できますか? (これはbashの事業者である場合)私は、あなたが行うことができますエアフローがタスク1から

をどのように機能するかを解釈する際に、あなたが問題を抱えていると思う:

t1 = BashOperator(task_id='t1', bash_command='echo "{{ ti.xcom_push("t1") }}"', dag=dag) 

とTask2の中:

tiがある
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("t1") }}"', dag=dag) 

task_instance変数と{{}}表記は変数セクションにアクセスするために使用されます

+0

もっとコードを更新しました。だから私はあなたが正しいと信じて、私は正しくそれを使用していない。私のタスクインスタンス "ti"はちょうど "oodas"ですか? BashOperatorがxcomに使用するキーが何であるかわからない –

関連する問題