2012-03-06 18 views
10

私はdjangoとrabbitmqでセロリを使用してメッセージキューを作成しています。私はまた別のマシンから生まれたワーカーを持っています。セロリ - タスクのコール機能を完了

def processtask(request, name): 
    args = ["ls", "-l"] 
    MyTask.delay(args) 
    return HttpResponse("Task set to execute.") 

が私の仕事は、このように構成されています:Djangoのビューでは、私はこのようなプロセスを始めている

class MyTask(Task): 
    def run(self, args): 
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
    (out, err) = p.communicate() 
    return out 

私の質問は今ブローカー(私のDjangoプロジェクトが)今受け取ることができる方法です作業者が自分のコンピュータ上で実行した "ls -l"コマンドの出力です。私は、実行されたコマンドからの出力を送信する準備ができたら、ブローカーの関数をワーカーが呼び出すのが最善の策だと思います。

出力をワーカーから非同期で受け取って、Webページを出力に更新したいのですが、それは別の時間です。今のところ私は労働者からのアウトプットを受け取ることを好むだけです。

更新

は、今私は、タスクが実行され、Webアプリケーションに通知タスクの終了時にトリガされたHTTP GETリクエストを追加しました - 私は、HTTP GETにTASK_IDを送信しています。 HTTPのGETメソッドは、AsyncResultを作成し、結果を取得した、Djangoのビューを呼び出しますが、問題は、result.get()を呼び出すときに、私は次のエラーを取得:

/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration. 
    "Polling results with transaction isolation level" 

任意のアイデアなぜ?私はAMQPでrabbitmqを使用しているのでデータベースを使用していません。

更新。

3番目のオプションを使用することをお勧めします。これは、最適なオプションのように思われます。

class MyTask(Task): 
    def __call__(self, *args, **kwargs): 
    return self.run(*args, **kwargs) 

    def after_return(self, status, retval, task_id, args, kwargs, einfo): 
    if self.webhost is not None: 
     conn = httplib.HTTPConnection(self.webhost, self.webport) 
     conn.request("HEAD", "/vuln/task/output/"+task_id) 

    def run(self, args, webhost=None, webport=None): 
    self.webhost = webhost 
    self.webport = webport 
    r = "This is a basic result string used for code clarity" 
    return r 

だから私は、タスクの実行()関数は、すでに値を戻したことからも、私の仕事上のロックを解除する必要がありafter_return機能を、上書きしました:私の全体のタスクは、次のようになります。 HEADリクエストでは、基本的にdjango関数を呼び出しています。これはタスクの結果を提供するtask_idでAsyncResultを呼び出します。私はテスト目的のためにテスト目的のために任意の結果を使用しました。

私は上記のコードが機能しない理由を知りたいと思います。私はon_successを使うことができますが、それが違いを生むとは思いませんか?

+0

コマンドの出力をデータベースに保存できますか? – jpic

+0

こんにちは、従業員はブローカーのデータベースへのアクセス権を持たず、アクセス権を持たないためです。私は間違いなく結果を送り返し、ブローカーで処理する必要があります。 – eleanor

+1

HTTP APIを使って結果を返すことができますか? Djangoでこれを行う簡単な方法がいくつかあります。 – jpic

答えて

14

あなたは以下のでしょうhere見れば:

ジャンゴ - セロリは基本的に通信バスとして使用されているすべてのタスク/結果、ウサギ-MQを追跡するためにMySQLを使用しています。

実際に何が起こっているのは、タスクがまだ実行されている間に作業員のASyncResultを取得しようとしていることです(このタスクはサーバーにHTTPリクエストを呼び出しました。作業者はまだアクティブであり、結果行はまだロックされています)。 Djangoはタスクの結果(状態と実行関数の実際の戻り値)を読み込もうとすると、行がロックされていることがわかり、警告が表示されます。

これを解決について移動するいくつかの方法があります。

  1. 結果を享受し、それはあなたの処理タスクにチェーンに別のセロリのタスクを設定しますが。そうすれば元のタスクは終了し、dbのロックを解除し、新しいものを取得し、djangoの結果を読み込み、必要な処理を行います。これについてセロリの文書を見てください。

  2. DBでフェッチするのではなく、処理結果をペイロードとして添付してDjangoにPOSTを実行するだけです。

  3. タスククラスのon_successをオーバーライドし、Djangoへの通知リクエストをPOSTしてから、dbテーブルのロックを解放する必要があります。

runメソッドの復帰(処理された可能性があります)には、処理結果全体(それがどれほど大きくても)を保存する必要があることに注意してください。結果がどれくらい大きいかは言及していないので、実際には上記のシナリオ#2を実行するのが理にかなっているかもしれません(これは私がやることです)。あるいは、私は#3と一緒に行くだろう。また、あなたのタスクでon_failureメソッドも処理することを忘れないでください。

+0

ご意見ありがとうございます。私はあなたの答えを受け入れる前に答えが必要な追加の質問をするために私の答えを更新しました。これは本当に良いbtwです。 – eleanor

関連する問題