2016-09-06 14 views
0

@daily間隔のスケジューリングとすべてのワークフローのタスクIDを持つDAGが作成されました。しかし、それは例外として実行されていません。このようにすることは可能ですか?特定のDAGの動的タスクを作成する他の方法はありますか?コマンドラインを使用して特定のタスクインスタンスを一時停止するには?動的に作成されたタスク/ダッグがApacheのエアフローで動作しない

from __future__ import print_function 
from builtins import range 
from airflow.operators import PythonOperator,DummyOperator,BranchPythonOperator,SqlSensor 
from airflow.models import DAG 
from datetime import datetime, timedelta 

import time 
from pprint import pprint 

seven_days_ago = datetime.combine(
     datetime.today() - timedelta(7), datetime.min.time()) 

args = { 
    'owner': 'varakumar', 
    'start_date': seven_days_ago, 
} 

dag = DAG(
    dag_id='dynamic_task_creation', default_args=args, 
    schedule_interval="@daily") 

def get_decision(): 
    return "right" 

start = DummyOperator(
    task_id='start', 
    dag=dag) 

td=datetime.today() 
x=str(datetime(td.year,td.month,td.day,td.hour,td.minute,td.second)).replace (" ", "_").replace (":", "-") 
pause_task_id = ("pause-%s" % x) 

pause = DummyOperator(
    task_id=pause_task_id, 
    dag=dag) 
pause.set_upstream(start) 

decision = BranchPythonOperator(
    task_id='decision', 
    python_callable=lambda: get_decision(), 
    dag=dag) 
decision.set_upstream(pause) 

left = DummyOperator(
    task_id='left', 
    dag=dag) 
left.set_upstream(decision) 

right = DummyOperator(
    task_id='right', 
    dag=dag) 
right.set_upstream(decision) 

私が見る最初の問題は、あなたがダイナミックstart_dateを使用していることである事前

+0

これはどのように動作すると思いますか、実際にどのように動作していますか? – Mercury

答えて

0

でいただきありがとうございます。私はそれを行うときにいくつかの奇妙な動作を見てきました、そして、私はそれが気流が過去のdagrunsのリストを維持する方法に基づいていると思う。固定start_dateを指定して、それが何かを解決するかどうかを確認してください。

どちらの場合でも、エアフロードキュメントadvise against dynamic start dates(ビットをスクロールダウンし、start_dateの説明を読んでください)。

EDIT:詳細については、thisもチェックしてください。

関連する問題