Luigiがメソッド(実行、出力、必要)を実行する順序は何ですか?私は、タスクDAGの有効性をチェックするための最初のチェックとしてrequireが実行されているが、run()の後に出力を実行すべきではないと理解していますか?Luigiタスクメソッドの実行順序
私は実際にカフカのメッセージが実行されるのを待っていて、そのトリガーに基づいて他のタスクを実行してLocalTargetを返します。このように:
例外:パスまたはis_tmpがリターンLocalTarget(self.path)ラインで
を設定する必要があります
def run(self):
for message in self.consumer:
self.metadata_key = str(message.value, 'utf-8')
self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
if not os.path.exists(self.path):
os.mkdir(self.path)
with self.conn.cursor() as cursor:
all_accounts = cursor.execute('select domainname from tblaccountinfo;')
for each in all_accounts:
open(os.path.join(self.path,each)).close()
def output(self):
return LocalTarget(self.path)
しかし、私はというエラーを取得します。なぜluigiはdef run()が完了するまでdef output()メソッドを実行しようとしますか?