2017-12-16 23 views
0

私はAirflowを初めて使用しています。私は私が私のDAGを参照してくださいWebサーバーのインタフェースを使用する場合エアフローの出力を参照

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 
import datetime 

DAG = DAG(
    dag_id='example_dag', 
    start_date=datetime.datetime.now(), 
    schedule_interval='@once' 
) 

def push_function(**kwargs): 
    ls = ['a', 'b', 'c'] 
    return ls 

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function, 
    provide_context=True, 
    dag=DAG) 

def pull_function(**kwargs): 
    ti = kwargs['ti'] 
    ls = ti.xcom_pull(task_ids='push_task') 
    with open('test.txt','w') as out: 
     out.write(ls) 
    out.close() 

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function, 
    provide_context=True, 
    dag=DAG) 

push_task >> pull_task 

:私は以下のようにtxtファイルにリストを保存するための簡単なコードを書きました。また、私はairflow list_dagsCLIに書きました。

私もpython code.pyを使用して私のコードをコンパイルし、その結果がエラーなしで以下のようなものだった:

[2017-12-16 14:21:30,609] {__init__.py:57} INFO - Using executor SequentialExecutor 
[2017-12-16 14:21:30,709] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt 
[2017-12-16 14:21:30,741] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt 

私は両方とも私がすることはできません、しかしairflow trigger_dag Mydag

UIで、コマンドとダグを実行しようとしました実行後に私のtxt結果ファイルを見てください。ログファイルにもエラーはありません。

私のtxtファイルはどのようにして見つけることができますか?

答えて

0

絶対ファイルパスを使用してもう一度試してみるか、ファイルの検索に役立つように、メソッド内に現在の作業ディレクトリをos.getcwd()で記録してみることができます。

関連する問題