2016-12-29 11 views
1

Luigiがメソッド(実行、出力、必要)を実行する順序は何ですか?私は、タスクDAGの有効性をチェックするための最初のチェックとしてrequireが実行されているが、run()の後に出力を実行すべきではないと理解していますか?Luigiタスクメソッドの実行順序

私は実際にカフカのメッセージが実行されるのを待っていて、そのトリガーに基づいて他のタスクを実行してLocalTargetを返します。このように:

例外:パスまたはis_tmpがリターンLocalTarget(self.path)ラインで

を設定する必要があります

def run(self): 
    for message in self.consumer: 
     self.metadata_key = str(message.value, 'utf-8') 
     self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id) 
     if not os.path.exists(self.path): 
      os.mkdir(self.path) 

     with self.conn.cursor() as cursor: 
       all_accounts = cursor.execute('select domainname from tblaccountinfo;') 
     for each in all_accounts: 
      open(os.path.join(self.path,each)).close() 

def output(self): 
    return LocalTarget(self.path) 

しかし、私はというエラーを取得します。なぜluigiはdef run()が完了するまでdef output()メソッドを実行しようとしますか?

答えて

2

パイプライン(つまり1つ以上のタスク)を実行すると、Luigiは最初にその出力ターゲットがすでに存在するかどうかをチェックし、存在しない場合は実行するタスクをスケジュールします。

ルイージはどのようなターゲットをチェックする必要があるのか​​を知っていますか?それはあなたの仕事のoutput()メソッドと呼んでいます。