2017-11-23 5 views
3

Airflowを使用して実装されたワークフローには、タスクA、B、C、およびDが含まれています。ワークフローでイベントCを待機させたいです。気流センサーでは、ある状態をポーリングして状態をチェックし、その状態が真であればワークフロー内の次のタスクがトリガーされます。私の要件はポーリングを避けることです。 Hereエアフローのトリガーとなるrest_apiエンドポイントを作成する気流に関するrest_api_pluginについての1つの回答 - このプラグインを使用してワークフローでタスクをトリガーすることができます。しかし私のワークフローでは、ポーリングなしでAPI呼び出し(非同期イベント)を待つタスクを実装したいと思います。残りのAPI要求を受け取るとタスクが起動され、Airflowワークフローが再開されます。エアフローを使用して実装されたワークフロー内のDAGのタスクで非同期イベントを待機する方法はありますか。

ポーリングを避ける理由:効率が悪く、要件に応じて拡張できません。

更新

私はFalseを返すセンサーを作成し、@Daniel黄での回答で述べた提案を行いました。このセンサは、タスクに実装されますstart_evaluating_cycle、今、このセンサーのタスクは常にFalseを返すが、何も検出されていません。

class WaitForEventSensor(BaseSensorOperator): 

    def poke(self, context): 
     return False 

start_evaluating_cycle = WaitForEventSensor(
    task_id="start_evaluating_cycle", 
    dag=dag, 
    poke_interval=60*60 # any number will do here, because it not polling just returning false 
) 

enter image description here

私はrest_api_pluginを設定し、私はタスクをマークしようとしているプラ​​グインを使用して:start_evaluating_cyleワークフローを続行するには完了です。

  • enter image description here

rest_api_pluginが正常にタスクを実行し、私は、タスクが花使用して実行されたことがわかります。

  • enter image description here

しかし、ワークフローのタスクを:start_evaluating_cycleはまだ実行状態です:

  • List item

rest_api_pluginは、ワークフローのタスク独立して実行されています。タスクをワークフロー内で実行するには、どのようにしてrest_api_pluginを作成できますか?ワークフローとは独立したものではありません。しかし

私は気流UI管理者からタスクを選択し、成功をマーク:

  • List item

それはこのURLに私を取る:http://localhost:8080/admin/airflow/success?task_id=start_evaluating_cycle&dag_id=faculty_evaluation_workflow&upstream=false&downstream=false&future=false&past=false&execution_date=2017-11-26T06:48:54.297289&origin=http%3A%2F%2Flocalhost%3A8080%2Fadmin%2Fairflow%2Fgraph%3Fexecution_date%3D2017-11-26T06%253A48%253A54.297289%26arrange%3DTB%26root%3D%26dag_id%3Dfaculty_evaluation_workflow%26_csrf_token%3DImM3NmU4ZTVjYTljZTQzYWJhNGI4Mzg2MmZmNDU5OGYxYWY0ODAxYWMi.DPv1Ww.EnWS6ffVLNcs923y6eVRV_8R-X8

とするとき、私は確認して、ワークフローの進行をさらに、私が望むものですが、私は残りのAPIコールから成功をマークする必要があります。

私の懸念は、以下のとおりです。


  1. rest_api_pluginを使用して成功したとして、ワークフロー内で実行中のタスクをマークする方法は?
  2. 気流管理者が作成するURLを使用して、外部システムから呼び出すことで、 タスクを成功とマークすることはできますか?

答えて

3

可能な解決策の1つは、外部の何かが手動で状態を成功に設定するまで永遠に待機するセンサーを使用することです。

だから、ダミーセンサーのいくつかの並べ替えを持っていると思います:

次のように初期化さ
class DummySensor(BaseSensorOperator): 

    def poke(self, context): 
     return False 

task_c = DummySensor(
    dag=dag, 
    task_id='task_c', 
    interval=600, # something fairly high since we're not polling for anything, just to check when to timeout 
    timeout=3600, # if nothing externally sets state to success in 1 hour, task will fail so task D does not run 
) 

タスクCが起動すると、それだけでRUNNING状態で座ってます。次に、REST APIプラグインを使用して、条件が満たされたときにタスクCの状態をSUCCESSに設定できます。タスクDと他の下流のタスクが開始されます。

ダミーセンサは何もしないで待機している間も、作業者のスロットにまだ保持されています。

+0

私はあなたの提案を試みた、私は成功できなかった。問題の結果を@Daniel Huangに更新しました。 – javed

+0

#1私はRest API Pluginに詳しくはわかりませんが、トリガするコマンドは正しく表示されます。正しいダグランを見ていますか? APIコール試行のスクリーンショットとUIマーク成功ウィンドウは、異なる実行日を参照します。 #2可能かもしれないが、私は試したことはないが、これらのエンドポイントはUIの外で使われることを意図していなかったので、これを最後の手段として使用するだろう。また、既存の実験的APIに追加することや、独自のAPIを作成してタスクインスタンスの状態のDB更新を行うこともできます。 –

関連する問題