2017-06-21 3 views
0

luigiでは、タスクが別のタスクに帰着すると、2番目のタスクが元のタスクの新しい依存関係になり、結果として得られたタスクが完了した後に元のタスクが再実行されることを理解しています。luigi:タスクは依存関係を作成せずに他のタスクを実行しますか?

ただし、特定のケースでは、私は、繰延-に従属関係になってタスクせずに、別のタスクに延期するタスクをしたいと思います。私は私の現在のタスクのrun方法は、他のタスクの完了後に再実行することにしたくないので、私はこれをしたい理由があります。

はい、私は私のrun方法が冪等する必要があることを知っています。それにもかかわらず、私は絶対に他のタスクに降伏した後にそのメソッドを2回実行したくない場合があります。

私はこれを行う方法を考え出したが、私はそれが最善の解決策だかはわからない、とあなたがたのうち、いずれかを持っている場合、私は、いくつかの提案をしたいと思います。 MainTaskOtherTask

は、次の2つのタスクがあると仮定します。 MainTaskは、さまざまなパラメータを使用してコマンドラインから呼び出されます。これらのパラメータの設定に応じて、MainTaskOtherTaskを起動することがあります。もしそうなら、私はrunメソッドMainTaskをもう一度起動したくありません。

class OtherTask(luigi.Task): 
    # Under some circumstances, this task can be invoked 
    # from the command line, and it can also be invoked 
    # in the normal luigi manner as a dependency for one 
    # or more other tasks. 
    # It also might be yielded to, as is done in the 
    # "run" method for `MainTask`, below. 

    id = luigi.parameter.IntParameter() 

    def complete(self): 
     # ... 
     # return True or False depending on various tests. 

    def requires(self): 
     # return [ ... various dependencies ... ] 

    def run(self): 
     # do stuff with self.id 
     # ... 
     with self.output().open('w') as f: 
      f.write('OK') 

    def output(self): 
     return '... something ...' 

class MainTask(luigi.Task): 
    # Parameters are expected to be supplied on the command line. 
    param1 = luigi.parameter.IntParameter() 
    param2 = luigi.parameter.BoolParameter() 
    # ... etc. ... 

    def run(self): 
     # 
     # Here's how I keep this "run" method from being 
     # invoked more than once. Is there a better way 
     # to invoke `OtherTask` without having it cause 
     # this current task to be re-invoked? 
     if self.complete(): 
      return 

     # Normal "run" processing for this task ... 
     # ... etc. ... 

     # Possibly run `OtherTask` multiple times, only if 
     # certain conditions are met ... 
     tasks = [] 
     if the_conditions_are_met: 
      ids = [] 
      # Append multiple integer ID's to the `ids` list. 
      # Calculate each ID depending upon the values 
      # passed in via self.param1, self.param2, etc. 
      # Do some processing depending on these ID's. 
      # ... etc. ... 

      # Then, create a list of tasks to be invoked, 
      # each one taking one of these ID's as a parameter. 
      for the_id in ids: 
       tasks.append(OtherTask(id=the_id)) 

     with self.output().open('w') as f: 
      f.write('OK') 

     # Optionally yield after marking this task as 
     # complete, so that when the yielded tasks have 
     # all run, this task's "run" method can test for 
     # completion and not re-run its logic. 
     if tasks: 
      yield tasks 

    def output(self): 
     return '... whatever ...'   
+0

、すべての得られたタスクが成功すると、それは 'MainTask'が最初に終了した後に真であると完了検査を強制するために、再実行する必要はありません場合は、この唯一の作品。タスクが失敗したにもかかわらず、再実行された場合は完了したように見えます。 – HippoMan

+0

私がこのケースを処理する最善の方法は、すべてのロジックを 'MainTask'の' run'メソッドから取り出して、新しい 'AuxiliaryTask'の' run'メソッドに入れることです。 'MainTask'への依存。 'AuxiliaryTask'は計算結果を出力し、' MainTask'はそれを単に入力として読み込み、 'OtherTask'に出力します。おそらくこれが適切に動作する唯一の方法ですが、私はまだ最初のタスクの 'run'メソッドを再呼び出しすることなく、あるタスクが別のタスクに遅れをとる方法を望んでいます。 – HippoMan

答えて

0

私のコメントによれば、補助クラスを使用すると効果があるようです。一度だけ実行され、メインクラスのrunメソッドが複数回呼び出されたとしても、再計算されずに補助クラスの出力データが再利用されます。実際

class OtherTask(luigi.Task): 
    # Under some circumstances, this task can be invoked 
    # from the command line, and it can also be invoked 
    # in the normal luigi manner as a dependency for one 
    # or more other tasks. 
    # It also might be yielded to, as is done in the 
    # "run" method for `MainTask`, below. 

    id = luigi.parameter.IntParameter() 

    def complete(self): 
     # ... 
     # return True or False depending on various tests. 

    def requires(self): 
     # return [ ... various dependencies ... ] 

    def run(self): 
     # do stuff with self.id 
     # ... 
     with self.output().open('w') as f: 
      f.write('OK') 

    def output(self): 
     return '... something ...' 

class AuxiliaryTask(luigi.Task): 

    def requires(self): 
     # return [ ... various dependencies ... ] 

    def run(self):     
     ids = [] 
     # Append multiple integer ID's to the `ids` list. 
     # Calculate each ID depending upon the values 
     # passed to this task via its parameters. Then ... 
     with self.output().open('w') as f: 
      f.write(json.dumps(ids)) 

    def output(self): 
     return '... something else ...' 

class MainTask(luigi.Task): 
    # Parameters are expected to be supplied on the command line. 
    param1 = luigi.parameter.IntParameter() 
    param2 = luigi.parameter.BoolParameter() 
    # ... etc. ... 

    def requires(self): 
     return [ self.clone(AuxiliaryTask) ] 

    def run(self): 
     # This method could get re-run after the yields, 
     # below. However, it just re-reads its input, instead 
     # of that input being recalculated. And in the second 
     # invocation, luigi's dependency mechanism will prevent 
     # any re-yielded-to tasks from repeating what they did 
     # before. 
     ids = [] 
     with self.input().open('r') as f: 
      ids = json.dumps(f.read()) 

     if ids: 
      tasks = [] 

      # Create a list of tasks to be invoked, 
      # each one taking one of these ID's as a parameter. 
      # Then, yield to each of these tasks. 
      for the_id in ids: 
       tasks.append(OtherTask(id=the_id)) 
      if tasks: 
       yield tasks 

     with self.output().open('w') as f: 
      f.write('OK') 


    def output(self): 
     return '... whatever ...'  
関連する問題