2017-06-21 3 views




私はこれを行う方法を考え出したが、私はそれが最善の解決策だかはわからない、とあなたがたのうち、いずれかを持っている場合、私は、いくつかの提案をしたいと思います。 MainTaskOtherTask

は、次の2つのタスクがあると仮定します。 MainTaskは、さまざまなパラメータを使用してコマンドラインから呼び出されます。これらのパラメータの設定に応じて、MainTaskOtherTaskを起動することがあります。もしそうなら、私はrunメソッドMainTaskをもう一度起動したくありません。

class OtherTask(luigi.Task): 
    # Under some circumstances, this task can be invoked 
    # from the command line, and it can also be invoked 
    # in the normal luigi manner as a dependency for one 
    # or more other tasks. 
    # It also might be yielded to, as is done in the 
    # "run" method for `MainTask`, below. 

    id = luigi.parameter.IntParameter() 

    def complete(self): 
     # ... 
     # return True or False depending on various tests. 

    def requires(self): 
     # return [ ... various dependencies ... ] 

    def run(self): 
     # do stuff with self.id 
     # ... 
     with self.output().open('w') as f: 

    def output(self): 
     return '... something ...' 

class MainTask(luigi.Task): 
    # Parameters are expected to be supplied on the command line. 
    param1 = luigi.parameter.IntParameter() 
    param2 = luigi.parameter.BoolParameter() 
    # ... etc. ... 

    def run(self): 
     # Here's how I keep this "run" method from being 
     # invoked more than once. Is there a better way 
     # to invoke `OtherTask` without having it cause 
     # this current task to be re-invoked? 
     if self.complete(): 

     # Normal "run" processing for this task ... 
     # ... etc. ... 

     # Possibly run `OtherTask` multiple times, only if 
     # certain conditions are met ... 
     tasks = [] 
     if the_conditions_are_met: 
      ids = [] 
      # Append multiple integer ID's to the `ids` list. 
      # Calculate each ID depending upon the values 
      # passed in via self.param1, self.param2, etc. 
      # Do some processing depending on these ID's. 
      # ... etc. ... 

      # Then, create a list of tasks to be invoked, 
      # each one taking one of these ID's as a parameter. 
      for the_id in ids: 

     with self.output().open('w') as f: 

     # Optionally yield after marking this task as 
     # complete, so that when the yielded tasks have 
     # all run, this task's "run" method can test for 
     # completion and not re-run its logic. 
     if tasks: 
      yield tasks 

    def output(self): 
     return '... whatever ...'   

、すべての得られたタスクが成功すると、それは 'MainTask'が最初に終了した後に真であると完了検査を強制するために、再実行する必要はありません場合は、この唯一の作品。タスクが失敗したにもかかわらず、再実行された場合は完了したように見えます。 – HippoMan


私がこのケースを処理する最善の方法は、すべてのロジックを 'MainTask'の' run'メソッドから取り出して、新しい 'AuxiliaryTask'の' run'メソッドに入れることです。 'MainTask'への依存。 'AuxiliaryTask'は計算結果を出力し、' MainTask'はそれを単に入力として読み込み、 'OtherTask'に出力します。おそらくこれが適切に動作する唯一の方法ですが、私はまだ最初のタスクの 'run'メソッドを再呼び出しすることなく、あるタスクが別のタスクに遅れをとる方法を望んでいます。 – HippoMan




class OtherTask(luigi.Task): 
    # Under some circumstances, this task can be invoked 
    # from the command line, and it can also be invoked 
    # in the normal luigi manner as a dependency for one 
    # or more other tasks. 
    # It also might be yielded to, as is done in the 
    # "run" method for `MainTask`, below. 

    id = luigi.parameter.IntParameter() 

    def complete(self): 
     # ... 
     # return True or False depending on various tests. 

    def requires(self): 
     # return [ ... various dependencies ... ] 

    def run(self): 
     # do stuff with self.id 
     # ... 
     with self.output().open('w') as f: 

    def output(self): 
     return '... something ...' 

class AuxiliaryTask(luigi.Task): 

    def requires(self): 
     # return [ ... various dependencies ... ] 

    def run(self):     
     ids = [] 
     # Append multiple integer ID's to the `ids` list. 
     # Calculate each ID depending upon the values 
     # passed to this task via its parameters. Then ... 
     with self.output().open('w') as f: 

    def output(self): 
     return '... something else ...' 

class MainTask(luigi.Task): 
    # Parameters are expected to be supplied on the command line. 
    param1 = luigi.parameter.IntParameter() 
    param2 = luigi.parameter.BoolParameter() 
    # ... etc. ... 

    def requires(self): 
     return [ self.clone(AuxiliaryTask) ] 

    def run(self): 
     # This method could get re-run after the yields, 
     # below. However, it just re-reads its input, instead 
     # of that input being recalculated. And in the second 
     # invocation, luigi's dependency mechanism will prevent 
     # any re-yielded-to tasks from repeating what they did 
     # before. 
     ids = [] 
     with self.input().open('r') as f: 
      ids = json.dumps(f.read()) 

     if ids: 
      tasks = [] 

      # Create a list of tasks to be invoked, 
      # each one taking one of these ID's as a parameter. 
      # Then, yield to each of these tasks. 
      for the_id in ids: 
      if tasks: 
       yield tasks 

     with self.output().open('w') as f: 

    def output(self): 
     return '... whatever ...'  