2017-10-18 6 views
1

私はExternalTask​​Sensorを使用しようとしていますが、既に正常に完了した別のDAGのタスクを突き刺してしまいます。Airflow ExternalTask​​Sensorが固着する

ここでは、最初のDAG「a」がそのタスクを完了し、その後、ExternalTask​​Sensorを介して2番目のDAG「b」がトリガされると想定されます。代わりに、a.first_taskのために突き刺さってしまいます。

まずDAG:

import datetime 
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator 

dag = DAG(
    dag_id='a', 
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()}, 
    schedule_interval=None 
) 

def do_first_task(): 
    print('First task is done') 

PythonOperator(
    task_id='first_task', 
    python_callable=do_first_task, 
    dag=dag) 

第二DAG:

import datetime 
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator 
from airflow.operators.sensors import ExternalTaskSensor 

dag = DAG(
    dag_id='b', 
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()}, 
    schedule_interval=None 
) 

def do_second_task(): 
    print('Second task is done') 

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed', 
    external_dag_id='a', 
    external_task_id='first_task', 
    dag=dag) >> \ 
PythonOperator(
    task_id='second_task', 
    python_callable=do_second_task, 
    dag=dag) 

私はここで何をしないのですか?

答えて

1

ExternalTaskSensorは、同じ実行日のdagで実行されているタスクに依存していることを前提としています。

これは、abのダッグを同じスケジュール(たとえば、毎日午前9時またはw/e)で実行する必要があることを意味します。

そうでない場合は、ExternalTaskSensorをインスタンス化するときにexecution_deltaまたはexecution_date_fnを使用する必要があります。ここで

さらに明確化を支援するオペレータ自体の内部文書である:

:param execution_delta: time difference with the previous execution to 
    look at, the default is the same execution_date as the current task. 
    For yesterday, use [positive!] datetime.timedelta(days=1). Either 
    execution_delta or execution_date_fn can be passed to 
    ExternalTaskSensor, but not both. 

:type execution_delta: datetime.timedelta 


:param execution_date_fn: function that receives the current execution date 
    and returns the desired execution date to query. Either execution_delta 
    or execution_date_fn can be passed to ExternalTaskSensor, but not both. 

:type execution_date_fn: callable 
関連する問題