luigiでは、タスクが別のタスクに帰着すると、2番目のタスクが元のタスクの新しい依存関係になり、結果として得られたタスクが完了した後に元のタスクが再実行されることを理解しています。luigi:タスクは依存関係を作成せずに他のタスクを実行しますか?
ただし、特定のケースでは、私は、繰延-に従属関係になってタスクせずに、別のタスクに延期するタスクをしたいと思います。私は私の現在のタスクのrun
方法は、他のタスクの完了後に再実行することにしたくないので、私はこれをしたい理由があります。
はい、私は私のrun
方法が冪等する必要があることを知っています。それにもかかわらず、私は絶対に他のタスクに降伏した後にそのメソッドを2回実行したくない場合があります。
私はこれを行う方法を考え出したが、私はそれが最善の解決策だかはわからない、とあなたがたのうち、いずれかを持っている場合、私は、いくつかの提案をしたいと思います。 MainTask
とOtherTask
:
は、次の2つのタスクがあると仮定します。 MainTask
は、さまざまなパラメータを使用してコマンドラインから呼び出されます。これらのパラメータの設定に応じて、MainTask
はOtherTask
を起動することがあります。もしそうなら、私はrun
メソッドMainTask
をもう一度起動したくありません。
class OtherTask(luigi.Task):
# Under some circumstances, this task can be invoked
# from the command line, and it can also be invoked
# in the normal luigi manner as a dependency for one
# or more other tasks.
# It also might be yielded to, as is done in the
# "run" method for `MainTask`, below.
id = luigi.parameter.IntParameter()
def complete(self):
# ...
# return True or False depending on various tests.
def requires(self):
# return [ ... various dependencies ... ]
def run(self):
# do stuff with self.id
# ...
with self.output().open('w') as f:
f.write('OK')
def output(self):
return '... something ...'
class MainTask(luigi.Task):
# Parameters are expected to be supplied on the command line.
param1 = luigi.parameter.IntParameter()
param2 = luigi.parameter.BoolParameter()
# ... etc. ...
def run(self):
#
# Here's how I keep this "run" method from being
# invoked more than once. Is there a better way
# to invoke `OtherTask` without having it cause
# this current task to be re-invoked?
if self.complete():
return
# Normal "run" processing for this task ...
# ... etc. ...
# Possibly run `OtherTask` multiple times, only if
# certain conditions are met ...
tasks = []
if the_conditions_are_met:
ids = []
# Append multiple integer ID's to the `ids` list.
# Calculate each ID depending upon the values
# passed in via self.param1, self.param2, etc.
# Do some processing depending on these ID's.
# ... etc. ...
# Then, create a list of tasks to be invoked,
# each one taking one of these ID's as a parameter.
for the_id in ids:
tasks.append(OtherTask(id=the_id))
with self.output().open('w') as f:
f.write('OK')
# Optionally yield after marking this task as
# complete, so that when the yielded tasks have
# all run, this task's "run" method can test for
# completion and not re-run its logic.
if tasks:
yield tasks
def output(self):
return '... whatever ...'
、すべての得られたタスクが成功すると、それは 'MainTask'が最初に終了した後に真であると完了検査を強制するために、再実行する必要はありません場合は、この唯一の作品。タスクが失敗したにもかかわらず、再実行された場合は完了したように見えます。 – HippoMan
私がこのケースを処理する最善の方法は、すべてのロジックを 'MainTask'の' run'メソッドから取り出して、新しい 'AuxiliaryTask'の' run'メソッドに入れることです。 'MainTask'への依存。 'AuxiliaryTask'は計算結果を出力し、' MainTask'はそれを単に入力として読み込み、 'OtherTask'に出力します。おそらくこれが適切に動作する唯一の方法ですが、私はまだ最初のタスクの 'run'メソッドを再呼び出しすることなく、あるタスクが別のタスクに遅れをとる方法を望んでいます。 – HippoMan