2017-09-11 1 views
1

は、私はこのような、simplehttpoperatorをトリガーにしたい: 気流trigger_dag test_trigger --conf '{ "名": "何か"}' は

iがkwargsから[ 'dag_run' を使用してパラメータを受け入れるためにpythonoperatorのpython_callableを使用します] .confの、と私は[「dag_run」]を渡したい。simplehttpoperatorにconfに、どのように私はそれを行うことができますか?誰でも助けることができますか?タスク間通信のために

cc_ = {} 


def run_this_func(ds, **kwargs): 
    cc_ = kwargs['dag_run'].conf 
    logging.info(cc_) 
    return cc_ 

run_this = PythonOperator(
    task_id='run_this', 
    provide_context=True, 
    python_callable=run_this_func, 
    dag=dag) 

http_task = SimpleHttpOperator(
    task_id='http_task', 
    http_conn_id='test_http', 
    method='POST', 
    endpoint='/api/v1/function', 
    data=cc_, 
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"}, 
    response_check=lambda response: True if "10000" in response.content else False, 
    dag=dag) 

http_task.set_upstream(run_this) 

答えて

0

、あなたは、XCOMをチェックするhttps://airflow.incubator.apache.org/concepts.html#xcoms

をお勧めします***** ***** UPDATE
(おかげ詳細についてダニエル)以下 は、いくつかのコードです@Chengzhiと@Danielへ

http_task = SimpleHttpOperator(
    task_id='http_task', 
    http_conn_id='test_http', 
    method='POST', 
    endpoint='/api/v1/function', 
    data=json.loads("{{ task_instance.xcom_pull(task_ids='run_this', key='return_value') }}"), 
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"}, 
    response_check=lambda response: True if "10000" in response.content else False, 
    dag=dag) 
+0

ただし、simplehttpoperatorでXCOMを使用するにはどうすればよいですか?どのケースコードも教えていただけますか?これは – pyfroggogogo

+0

@pyfroggogogo、私はいくつかのコード例を更新し、試してみてください。レンダリング後のバック辞書の種類にそれを得るために '|(「{tojson} {...}」)あなたは'データ=のjson.loadsを使用することができます。詳細については – Chengzhi

+1

テンプレートが文字列として渡さする必要が動作するかどうか –

0

感謝を:あなたは、あなたのSimpleHttpOperatorにあなたはXCOMを経由して戻り値を取得し、それを試してみることができます。気流にデフォルトJinja2のバージョンは2.8.1であるとJinja2のはバージョン2.9までは「tojson」という名前の組み込みフィルタが含まれていないため は、最後に私は、Jinja2の/ filter.pyでカスタムフィルタ「tojson」を書きました。

def do_tojson(value): 
    value = json.JSONEncoder().encode(value) 
    return value 

dagファイルでは、コードは次のとおりです。できます。

def run_this_func(ds, **kwargs): 
    cc_ = kwargs['dag_run'].conf 
    return cc_ 

run_this = PythonOperator(
    task_id='run_this', 
    provide_context=True, 
    python_callable=run_this_func, 
    dag=dag) 

http_task = SimpleHttpOperator(
    task_id='http_task', 
    http_conn_id='test_http', 
    method='POST', 
    endpoint='/api/v1/task', 
    data="{{ task_instance.xcom_pull(task_ids='run_this') |tojson}}", 
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*", 
      "Content-Type": "application/json"}, 
    response_check=lambda response: True if "10000" in response.content else False, 
    dag=dag) 

http_task.set_upstream(run_this)