Airflowを使用して実装されたワークフローには、タスクA、B、C、およびDが含まれています。ワークフローでイベントCを待機させたいです。気流センサーでは、ある状態をポーリングして状態をチェックし、その状態が真であればワークフロー内の次のタスクがトリガーされます。私の要件はポーリングを避けることです。 Hereエアフローのトリガーとなるrest_apiエンドポイントを作成する気流に関するrest_api_pluginについての1つの回答 - このプラグインを使用してワークフローでタスクをトリガーすることができます。しかし私のワークフローでは、ポーリングなしでAPI呼び出し(非同期イベント)を待つタスクを実装したいと思います。残りのAPI要求を受け取るとタスクが起動され、Airflowワークフローが再開されます。エアフローを使用して実装されたワークフロー内のDAGのタスクで非同期イベントを待機する方法はありますか。
ポーリングを避ける理由:効率が悪く、要件に応じて拡張できません。
更新
私はFalseを返すセンサーを作成し、@Daniel黄での回答で述べた提案を行いました。このセンサは、タスクに実装されますstart_evaluating_cycle、今、このセンサーのタスクは常にFalseを返すが、何も検出されていません。
class WaitForEventSensor(BaseSensorOperator):
def poke(self, context):
return False
start_evaluating_cycle = WaitForEventSensor(
task_id="start_evaluating_cycle",
dag=dag,
poke_interval=60*60 # any number will do here, because it not polling just returning false
)
私はrest_api_pluginを設定し、私はタスクをマークしようとしているプラグインを使用して:start_evaluating_cyleワークフローを続行するには完了です。
rest_api_pluginが正常にタスクを実行し、私は、タスクが花使用して実行されたことがわかります。
しかし、ワークフローのタスクを:start_evaluating_cycleはまだ実行状態です:
rest_api_pluginは、ワークフローのタスク独立して実行されています。タスクをワークフロー内で実行するには、どのようにしてrest_api_pluginを作成できますか?ワークフローとは独立したものではありません。しかし
私は気流UI管理者からタスクを選択し、成功をマーク:
とするとき、私は確認して、ワークフローの進行をさらに、私が望むものですが、私は残りのAPIコールから成功をマークする必要があります。
私の懸念は、以下のとおりです。
rest_api_pluginを使用して成功したとして、ワークフロー内で実行中のタスクをマークする方法は?- 気流管理者が作成するURLを使用して、外部システムから呼び出すことで、 タスクを成功とマークすることはできますか?
私はあなたの提案を試みた、私は成功できなかった。問題の結果を@Daniel Huangに更新しました。 – javed
#1私はRest API Pluginに詳しくはわかりませんが、トリガするコマンドは正しく表示されます。正しいダグランを見ていますか? APIコール試行のスクリーンショットとUIマーク成功ウィンドウは、異なる実行日を参照します。 #2可能かもしれないが、私は試したことはないが、これらのエンドポイントはUIの外で使われることを意図していなかったので、これを最後の手段として使用するだろう。また、既存の実験的APIに追加することや、独自のAPIを作成してタスクインスタンスの状態のDB更新を行うこともできます。 –