2017-09-15 3 views
0

私は気流が新です。現在、ETLパイプラインの私の会社では、Crontabとカスタムスケジューラ(社内で開発された)を使用しています。すべてのデータパイプ用のApacheエアフローを実装する予定です。そのために、unique_idを各タスクインスタンス/ Dagに対して見つけることができない機能を探索しています。ほとんどのソリューションがマクロとテンプレートで検索されましたが、どれもタスクのuniqueIDを提供していません。しかし、私は各タスクのUIでインクリメンタルなuniqueIDを見ることができます。私のPythonメソッド内でそれらの変数に簡単にアクセスする方法はありますか?主な使用例は、Python/ruby​​/Pentahoスクリプト/メソッドと呼ばれるジョブapacheのエアフロータスクのためのunique_idの取得

のために私のシェルスクリプトは、「test.sh」1がRUN_IDされ、他方がcollection_id 2つの引数を必要としています。現在、我々は、一元化されたデータベースから、このユニークなRUN_IDを生成し、我々が

from airflow.operators.bash_operator import BashOperator 
from datetime import date, datetime, timedelta 
from airflow import DAG 

shell_command = "/data2/test.sh -r run_id -c collection_id" 


putfiles_s3 = BashOperator(
       task_id='putfiles_s3', 
       bash_command=shell_command, 
       dag=dag) 

は(ユニークRUN_ID探していることを使用しようとしている気流コンテキストですでに存在している.IFジョブにそれを渡しているいずれかのダグ・レベル/タスクレベル)このダグ(スケジュール/手動)を実行している間に、実行ごとに実行されます。

注:これはサンプルタスクです。このDagには複数の依存タスクがあります。 気流UIからJOB_IDのスクリーンショットを添付 enter image description here

おかげ アヌープR

+0

あなたのコードが含まれています –

+0

あなたはUUIDを見ましたか? https://stackoverflow.com/questions/534839/how-to-create-a-guid-uuid-in-python#534851 –

+0

@MicahElliottあなたの提案をありがとう。私たちはこのようなランダムなIDを生成することができますまたはシェルのランダムコマンドから。私は、気流そのものによって生成されたidを、job_idのように探していました。参照用にAirflow UIのスクリーンショットを添付しています。 –

答えて

1

{{ ti.job_id }}何をしたいです:

from datetime import datetime, timedelta 
from airflow.operators.bash_operator import BashOperator 
from airflow import DAG 


dag = DAG(
    "job_id", 
    start_date=datetime(2018, 1, 1), 
) 

with dag: 
    BashOperator(
     task_id='unique_id', 
     bash_command="echo {{ ti.job_id }}", 
    ) 

これは、実行時に有効になります。

[2018-01-03 10:28:37,523] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpcj0omuts//tmp/airflowtmpcj0omuts/unique_iddq7kw0yj 
[2018-01-03 10:28:37,524] {bash_operator.py:88} INFO - Running command: echo 4 
[2018-01-03 10:28:37,621] {bash_operator.py:97} INFO - Output: 
[2018-01-03 10:28:37,648] {bash_operator.py:101} INFO - 4 

WebUIの「レンダリングテンプレート」ビューではなく、数のいずれもが表示されませんので、これが唯一、実行時に有効とされることに注意してください:この実行のログは次のようになります。

+0

{{ti.job_id}}私はどの演算子でも使うことができ、またPythonメソッドの引数として渡すこともできますか?気にしない場合この値をPythonメソッドに渡す例を教えてください Thanks Ash Berlin-テイラー 印刷が@Ashのです をti.job_id TI = kwargsから[ 'TIの']」コンテキストからアクセスするインスタンス変数の 印刷: –

+0

私はPythonのメソッドに デフtest_failure(** kwargsからは)同じことを通過させるためのソリューションを持って"task_instance"を介して利用可能な値が何であるかを示すドキュメントがあります。このURLは "ti"について多くのことを説明していません。https://pythonhosted.org/airflow/code.html#macros –

+0

"ti"はTaskInstance https://pythonhosted.org/airflow/code.html#airflow.models.TaskInstanceですが、そのオブジェクトのプロパティはドキュメント化されていないため、コードに行きます。 –

関連する問題