私はAirflow 1.8.1を使用しており、PostgreOperatorからのSQLリクエストの結果をプッシュしたいと思います。ここで気流:PostgreOperatorからxcom値をプッシュするには?
は私の仕事です:
check_task = PostgresOperator(
task_id='check_task',
postgres_conn_id='conx',
sql="check_task.sql",
xcom_push=True,
dag=dag)
def py_is_first_execution(**kwargs):
value = kwargs['ti'].xcom_pull(task_ids='check_task')
print 'count ----> ', value
if value == 0:
return 'next_task'
else:
return 'end-flow'
check_branch = BranchPythonOperator(
task_id='is-first-execution',
python_callable=py_is_first_execution,
provide_context=True,
dag=dag)
、ここでは私のsqlスクリプトである:私はcheck_task
からXCOM値をチェックするとき
select count(1) from table
それはnone
値を取得します。