私は気流が新です。現在、ETLパイプラインの私の会社では、Crontabとカスタムスケジューラ(社内で開発された)を使用しています。すべてのデータパイプ用のApacheエアフローを実装する予定です。そのために、unique_idを各タスクインスタンス/ Dagに対して見つけることができない機能を探索しています。ほとんどのソリューションがマクロとテンプレートで検索されましたが、どれもタスクのuniqueIDを提供していません。しかし、私は各タスクのUIでインクリメンタルなuniqueIDを見ることができます。私のPythonメソッド内でそれらの変数に簡単にアクセスする方法はありますか?主な使用例は、Python/ruby/Pentahoスクリプト/メソッドと呼ばれるジョブapacheのエアフロータスクのためのunique_idの取得
例
のために私のシェルスクリプトは、「test.sh」1がRUN_IDされ、他方がcollection_id 2つの引数を必要としています。現在、我々は、一元化されたデータベースから、このユニークなRUN_IDを生成し、我々が
from airflow.operators.bash_operator import BashOperator
from datetime import date, datetime, timedelta
from airflow import DAG
shell_command = "/data2/test.sh -r run_id -c collection_id"
putfiles_s3 = BashOperator(
task_id='putfiles_s3',
bash_command=shell_command,
dag=dag)
は(ユニークRUN_ID探していることを使用しようとしている気流コンテキストですでに存在している.IFジョブにそれを渡しているいずれかのダグ・レベル/タスクレベル)このダグ(スケジュール/手動)を実行している間に、実行ごとに実行されます。
注:これはサンプルタスクです。このDagには複数の依存タスクがあります。 気流UIからJOB_IDのスクリーンショットを添付
おかげ アヌープR
あなたのコードが含まれています –
あなたはUUIDを見ましたか? https://stackoverflow.com/questions/534839/how-to-create-a-guid-uuid-in-python#534851 –
@MicahElliottあなたの提案をありがとう。私たちはこのようなランダムなIDを生成することができますまたはシェルのランダムコマンドから。私は、気流そのものによって生成されたidを、job_idのように探していました。参照用にAirflow UIのスクリーンショットを添付しています。 –