2017-11-01 1 views
1

XComによって1つのタスクから別のタスクに文字列のリストを渡そうとしていますが、プッシュされたリストをリストとして解釈して戻すことはできません。例えば気流の従属タスクのパラメータとして文字列のリストを渡します

、私はShortCircuitOperatorで実行されるいくつかの機能blahで次の操作を行います。

paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list] 
kwargs['ti'].xcom_push(key='return_value', value=full_paths) 

、その後、私はオペレータのパラメータとして、このようなリストを使用します。例えば、

run_task_after_blah = AfterBlahOperator(
    task_id='run-task-after-blah', 
    ..., 
    input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}", 
    ..., 
) 

私はinput_pathspathsに等しくなるように期待していますが、レンダリングはモミ、その後、割り当てを起きていないため、ややテンプレートのレンダリングは、その後の私の文字列化リストにxcom_pullリターンを変換する(とAfterBlahOperatorインサートは、私はいくつかのセパレータで分離されpaths一つに文字列を連結し、XCOMにそれを押し、XCOMからではなく、XCOMとして引っ張ったときに、そのバック分割しようとしたJSONの要素の値として。

割り当て最初にレンダリングされる、私は得るその文字列化テンプレート内でsplit関数が呼び出されたときのリスト、またはsplit関数が"{{ ti.xcom_pull(task_ids='find-paths') }}".split(';')のように適用されている場合は、元の連結文字列pathsがリストされます。

XComは、抽出された値をさらに処理することができるが、タスクのパラメータとして「1」に変換するためのmultiple_valuesではなく、複数の値をタスクパラメータまたは複数の値として使用することができます。

このような文字列のリストを正確に返す特別な関数を記述することなく、これを行う方法はありますか? また、XComをあまりにも酷使しているかもしれませんが、Airflowには要素のリストをパラメータとして持つ演算子が多数あります(たとえば、前のタスクの結果であり、あらかじめわかっていない複数のファイルへのフルパスなど)。

答えて

2

Jinjaは文字列をレンダリングするので、テンプレートを使用してXComをフェッチすると、常に文字列になります。代わりに、TaskInstanceオブジェクトにアクセスできるXComを取得する必要があります。このような何か:

class AfterBlahOperator(BaseOperator): 

    def __init__(self, ..., input_task_id, *args, **kwargs): 
     ... 
     self.input_task_id = input_task_id 
     super(AfterBlahOperator, self).__init__(*args, **kwargs) 

    def execute(self, context): 
     input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id) 
     for path in input_paths: 
      ... 

これは、あなたがXCom docsは、例を提供PythonOperator、以内にそれをフェッチだろうかと似ています。

DAGでハードコードできる場合は、別のinput_pathsパラメータをサポートすることができますが、値を読み取るパラメータを確認するには追加のチェックが必要です。

+0

ありがとうございます!私はあまりにも多くのことを望んでいた!しかし、はい、 '操作'自体を持つカスタム演算子は、行く方法です。私はちょうど余分なコードを避けたいと思っていたが、恐らくこれはJinjaを使用することによるトレードオフである。 – Guille

関連する問題