エアフローjinja2テンプレート用のカスタムフィルタを追加しようとしています。エアフローカスタムjinja2フィルタ
:
S3の私のフォルダが
/年/月/日/
のようなものですので、私の目的は、変数にyesterday_dsを使用することですが、このような画面s3://logs.web.com/AWSLogs/ {{yesterday_ds | get_year}}/{{yesterday_ds | get_month}}/{{yesterday_ds | GET_DAY}}/
私は(私がすでに合併していると思うた。)PRで見てきたあなたは、DAGオブジェクト作成中dag_argsパラメータのパラメータ「user_defined_filters」でこれを行うことができるhere
問題は、たとえそれを実行していても、例えば「get_yearという名前のフィルタがありません」ということです。
これは私のコードです:
dag.py
dag = DAG(
dag_id='dag-name',
default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
template_searchpath=tmpl_search_path,
schedule_interval=timedelta(days=1),
max_active_runs=1,
)
utils.py
def get_dag_args(**kwargs):
return {
'owner' : kwargs.get('owner', 'owner_name'),
'depends_on_past' : kwargs.get('depends_on_past', False),
'start_date' : kwargs.get('start_date', datetime.now() - timedelta(1)),
'email' : kwargs.get('email', ['[email protected]']),
'retries' : kwargs.get('retries', 5),
'provide_context' : kwargs.get('provide_context', True),
'retry_delay' : kwargs.get('retry_delay', timedelta(minutes=5)),
'user_defined_filters': get_date_filters()
}
def get_date_filters():
return dict(
get_year=lambda date_string: date_string.strftime('%Y'),
get_month=lambda date_string: date_string.strftime('%m'),
get_day=lambda date_string: date_string.strftime('%d'),
)
私が間違っていますところ、誰もが見ていますか?ありがとうございました!ダグ定義した後、これを印刷
EDIT
は、残念ながら、ないカスタムフィルタを示していない:(。
jinja_env = dag.get_template_env()
print(jinja_env.filters)
また、私はDAGオブジェクトパラメータとして直接それを追加しようとした場合、としてそれは、テスト/ models.py @テストで示しています
Broken DAG: [/home/ubuntu/airflow/dags/dag.py] __init__() got an unexpected keyword argument 'user_defined_filters'
EDIT 2
Ok私はバージョン1.8.0を持っていて、これにはフィルタがないことがわかります。誰もピップ経由で1.8.2rcをダウンロードする方法を知っていますか?それとも、私たちは傾ける?
驚くばかりです。ありがとう。どうすればそのメーリングリストに参加できますか? thx –
これは、ドキュメント(https://airflow.incubator.apache.org/project.html)のリソースとリンクの章で説明しています。 – faeder