2017-11-03 10 views
2

DAG(DAG1)を使用しています。私は、コピーされた各ファイルの別のDAG(DAG2)を開始したいと思います。 DAG1の実行ごとにコピーされるファイルの数が異なるため、本質的にファイルをループしてDAG2を適切なパラメータで呼び出したいと思います。TriggerDagRunOperatorを使用して別のDAGを複数回実行する

例:

with DAG('DAG1', 
     description="copy files over", 
     schedule_interval="* * * * *", 
     max_active_runs=1 
    ) as dag: 


    t_rsync = RsyncOperator(task_id='rsync_data', 
     source='/source/', 
     target='/destination/') 

    t_trigger_preprocessing = TriggerDagRunOperator(task_id='trigger_preprocessing', 
     trigger_daq_id='DAG2', 
     python_callable=trigger 

    ) 

    t_rsync >> t_trigger_preprocessing 

私はt_rsyncから関連XCOMデータを引き出し、その後、DAG2をトリガーするpython_callable triggerを使用するように期待していました。しかし、私にはこれを行う方法が明確ではありません。と

class TriggerMultipleDagRunOperator(TriggerDagRunOperator): 
    def execute(self, context): 
     count = 0 
     for dro in self.python_callable(context): 
      if dro: 
       with create_session() as session: 
        dbag = DagBag(settings.DAGS_FOLDER) 
        trigger_dag = dbag.get_dag(self.trigger_dag_id) 
        dr = trigger_dag.create_dagrun(
         run_id=dro.run_id, 
         state=State.RUNNING, 
         conf=dro.payload, 
         external_trigger=True) 
        session.add(dr) 
        session.commit() 
        count = count + 1 
      else: 
       self.log.info("Criteria not met, moving on") 
     if count == 0: 
      raise AirflowSkipException('No external dags triggered') 

:私はDAG2の内容を簡素化(ともmax_active_runsと回路図を積み重ねる提供)するためにここにDAG2を呼び出すロジックを置くことを好むだろう

答えて

1

は、私自身のオペレータを書いてしまいました

:と一緒にすべてを結びつける、その後

def trigger_preprocessing(context): 
    for base_filename,_ in found.items(): 
     exp = context['ti'].xcom_pull(task_ids='parse_config', key='experiment') 
     run_id='%s__%s' % (exp['microscope'], datetime.utcnow().replace(microsecond=0).isoformat()) 
     dro = DagRunOrder(run_id=run_id) 
     d = { 
      'directory': context['ti'].xcom_pull(task_ids='parse_config', key='experiment_directory'), 
      'base': base_filename, 
      'experiment': exp['name'], 
     } 
     LOG.info('triggering dag %s with %s' % (run_id,d)) 
     dro.payload = d 
     yield dro 
    return 

などpython_callable

t_trigger_preprocessing = TriggerMultipleDagRunOperator(task_id='trigger_preprocessing', 
    trigger_dag_id='preprocessing', 
    python_callable=trigger_preprocessing 
) 
関連する問題