XComによって1つのタスクから別のタスクに文字列のリストを渡そうとしていますが、プッシュされたリストをリストとして解釈して戻すことはできません。例えば気流の従属タスクのパラメータとして文字列のリストを渡します
、私はShortCircuitOperator
で実行されるいくつかの機能blah
で次の操作を行います。
paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)
、その後、私はオペレータのパラメータとして、このようなリストを使用します。例えば、
run_task_after_blah = AfterBlahOperator(
task_id='run-task-after-blah',
...,
input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
...,
)
私はinput_paths
がpaths
に等しくなるように期待していますが、レンダリングはモミ、その後、割り当てを起きていないため、ややテンプレートのレンダリングは、その後の私の文字列化リストにxcom_pull
リターンを変換する(とAfterBlahOperator
インサートは、私はいくつかのセパレータで分離されpaths
一つに文字列を連結し、XCOMにそれを押し、XCOMからではなく、XCOMとして引っ張ったときに、そのバック分割しようとしたJSONの要素の値として。
割り当て最初にレンダリングされる、私は得るその文字列化テンプレート内でsplit
関数が呼び出されたときのリスト、またはsplit
関数が"{{ ti.xcom_pull(task_ids='find-paths') }}".split(';')
のように適用されている場合は、元の連結文字列paths
がリストされます。
XComは、抽出された値をさらに処理することができるが、タスクのパラメータとして「1」に変換するためのmultiple_valuesではなく、複数の値をタスクパラメータまたは複数の値として使用することができます。
このような文字列のリストを正確に返す特別な関数を記述することなく、これを行う方法はありますか? また、XComをあまりにも酷使しているかもしれませんが、Airflowには要素のリストをパラメータとして持つ演算子が多数あります(たとえば、前のタスクの結果であり、あらかじめわかっていない複数のファイルへのフルパスなど)。
ありがとうございます!私はあまりにも多くのことを望んでいた!しかし、はい、 '操作'自体を持つカスタム演算子は、行く方法です。私はちょうど余分なコードを避けたいと思っていたが、恐らくこれはJinjaを使用することによるトレードオフである。 – Guille