2
DAG(DAG1)を使用しています。私は、コピーされた各ファイルの別のDAG(DAG2)を開始したいと思います。 DAG1の実行ごとにコピーされるファイルの数が異なるため、本質的にファイルをループしてDAG2を適切なパラメータで呼び出したいと思います。TriggerDagRunOperatorを使用して別のDAGを複数回実行する
例:
with DAG('DAG1',
description="copy files over",
schedule_interval="* * * * *",
max_active_runs=1
) as dag:
t_rsync = RsyncOperator(task_id='rsync_data',
source='/source/',
target='/destination/')
t_trigger_preprocessing = TriggerDagRunOperator(task_id='trigger_preprocessing',
trigger_daq_id='DAG2',
python_callable=trigger
)
t_rsync >> t_trigger_preprocessing
私はt_rsync
から関連XCOMデータを引き出し、その後、DAG2をトリガーするpython_callable trigger
を使用するように期待していました。しかし、私にはこれを行う方法が明確ではありません。と
class TriggerMultipleDagRunOperator(TriggerDagRunOperator):
def execute(self, context):
count = 0
for dro in self.python_callable(context):
if dro:
with create_session() as session:
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True)
session.add(dr)
session.commit()
count = count + 1
else:
self.log.info("Criteria not met, moving on")
if count == 0:
raise AirflowSkipException('No external dags triggered')
:私はDAG2の内容を簡素化(ともmax_active_runs
と回路図を積み重ねる提供)するためにここにDAG2を呼び出すロジックを置くことを好むだろう