2009-09-15 20 views
4

LinuxでDjangoを実行していて、ビューを取得していて、そのサブプロセスのデータをlikeso例えば、ビューが作成したファイルで動作CMD:今Djangoでは、起動時間が遅いサブプロセスを呼び出す方法

def call_subprocess(request): 
    response = HttpResponse() 

    with tempfile.NamedTemporaryFile("W") as f: 
     f.write(request.GET['data']) # i.e. some data 

    # cmd operates on fname and returns output 
    p = subprocess.Popen(["cmd", f.name], 
        stdout=subprocess.PIPE, 
        stderr=subprocess.PIPE) 

    out, err = p.communicate() 

    response.write(p.out) # would be text/plain... 
    return response 

CMDは非常に遅い起動時間を持っていると仮定しますが、非常に高速な動作時間、およびそれネイティブにデーモンモードはありません。私はこの見解の応答時間を改善したいと思います。

私は彼らが入力を待つ必要があり、かつは1を尋ねるcall_processを持つ、システム全体がはるかに高速労働者、プール内のcmdをのインスタンスの数を起動して実行しますしたいと思いますそれらのワーカー・プール・プロセスのうちの1つがデータを処理します。

パートCMD、入力用CMD待機を呼び出し1関数:

これは実際に2重量部です。これは

def _run_subcmd(): 
    p = subprocess.Popen(["cmd", fname], 
     stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

    out, err = p.communicate() 
    # write 'out' to a tmp file 
    o = open("out.txt", "W") 
    o.write(out) 
    o.close() 
    p.close() 
    exit() 

def _run_cmd(data): 
    f = tempfile.NamedTemporaryFile("W") 
    pipe = os.mkfifo(f.name) 

    if os.fork() == 0: 
     _run_subcmd(fname) 
    else: 
     f.write(data) 

    r = open("out.txt", "r") 
    out = r.read() 
    # read 'out' from a tmp file 
    return out 

def call_process(request): 
    response = HttpResponse() 

    out = _run_cmd(request.GET['data']) 

    response.write(out) # would be text/plain... 
    return response 

は、第2部、すなわち

、パイプを使用してデータを待機しているバックグラウンドで実行されている労働者のセットを行うことができます。つまり、サブプロセスが既に実行中であるように上記を拡張したいと考えています。 Djangoのインスタンスを初期化し、またはこのcall_processが最初に呼び出され、これらの労働者のセットが

WORKER_COUNT = 6 
WORKERS = [] 

class Worker(object): 
    def __init__(index): 
     self.tmp_file = tempfile.NamedTemporaryFile("W") # get a tmp file name 
     os.mkfifo(self.tmp_file.name) 
     self.p = subprocess.Popen(["cmd", self.tmp_file], 
      stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
     self.index = index 

    def run(out_filename, data): 
     WORKERS[self.index] = Null # qua-mutex?? 
     self.tmp_file.write(data) 
     if (os.fork() == 0): # does the child have access to self.p?? 
      out, err = self.p.communicate() 
      o = open(out_filename, "w") 
      o.write(out) 
      exit() 

     self.p.close() 
     self.o.close() 
     self.tmp_file.close() 
     WORKERS[self.index] = Worker(index) # replace this one 
     return out_file 

    @classmethod 
    def get_worker() # get the next worker 
    # ... static, incrementing index 

を作成しているどこかに、このような労働者のいくつかの初期化がなければならないとき:

def init_workers(): # create WORKERS_COUNT workers 
    for i in xrange(0, WORKERS_COUNT): 
     tmp_file = tempfile.NamedTemporaryFile() 
     WORKERS.push(Worker(i)) 

今、私が上に持っているものは何かになる:

今質問:

  1. これは機能しますか? (私はちょうどStackOverflowに私の頭の上にこれをタイプしたので、問題があると確信していますが、概念的にはうまくいくでしょう)

  2. 何が問題ですか?

  3. これに代わる方法はありますか?例えばスレッドも同様に動作します(Debian Lenny Linuxです)。このような並列プロセスワーカープールを扱うライブラリはありますか?

  4. 私が意識しているはずのDjangoとのやり取りはありますか?

ありがとうございました!これが私のように面白い問題だと思ってください。

ブライアン

答えて

3

これは私がこれを推奨したのは今回が2度目なので、私はこの製品を叩いているようです。

しかし、メッセージキューサービス、特に分散メッセージキューが必要なようです。

  1. あなたのDjangoのアプリケーションはCMDが
  2. CMDは、それが実行され、結果が上流返さ
  3. いくつかの作品にプッシュされますキューに追加されます
  4. CMD
  5. 要求:

    EREは、それが動作する方法です

このコードのほとんどは存在し、独自のシステムを構築する必要はありません。

最初にDjangoで構築されたCeleryを見てください。

http://www.celeryq.org/ http://robertpogorzelski.com/blog/2009/09/10/rabbitmq-celery-and-django/

+0

これは面白いです - 私はそれを調べるでしょう。しかし、私が(またはしていない)問題は、#2( "CMDがキューに追加される"、つまりLaTeXがデータを取得する)の前に、ステップ#4のパート1( "実行されています"、つまりLaTeXが開始されている) )。しかし、私はかなりセロリがこれを行うことができると確信していますが、それは少しの掘り下げを必要とします。ありがとうAsksol。 –

0

方法 "daemonizing" python-daemonまたはその後継、grizzledを使用して、サブプロセスの呼び出しについて。

+0

とは関係ありませんが、それは何が良いですか?私はpythonデーモンに関連するいくつかの問題を抱えていましたが、私は本質的にコレクションライブラリに対して懐疑的です。 – asksol

3

イシはすでにセロリを述べたが、コメントはうまく コードサンプルでは動作しないので、私が代わりに答えとして返信します。

セロリをAMQP結果ストアと同期して使用するようにしてください。 実際の実行を別のプロセスまたは別のマシンに配布することができます。セロリで同期実行すると、例えば、簡単です:

>>> from celery.task import Task 
>>> from celery.registry import tasks 

>>> class MyTask(Task): 
... 
...  def run(self, x, y): 
...   return x * y 
>>> tasks.register(MyTask) 

>>> async_result = MyTask.delay(2, 2) 
>>> retval = async_result.get() # Now synchronous 
>>> retval 4 

AMQP結果ストアは 、非常に高速な結果を送り返すなりますが、それは現在の開発バージョンでのみ利用可能です(コードフリーズに 0.8.0になるために)

+0

1つの要件は、タスクを永遠にデーモンとして実行し、それからデータを送信/受信することです。 run()を呼び出す前にLaTeXが動作している必要があります(そうしないとLaTeXが起動するのを待たなければなりません)。私はそれがこれを行うことができるかどうかを見るためにセロリを探しています(私はそれができると思います)。 –

+0

LaTeXが最適化されていなければならないという要件はありませんか?これを行うには、LaTeX C API(またはそれが何であれ)を使用してワーカープロセス内に埋め込まなければなりません。これは可能なはずですが、セロリをかなりカスタマイズする必要があります。あなたの問題の一部をすでに解決しているので、それは良い出発点かもしれません。私はタスクキューがこれに適しているとは言っていませんでしたが、分散/並列処理部分は*かもしれません。タスクプールが必要で、結果を送受信するには、ワーカープロセスをLaTeXプロセッサにします。 – asksol

関連する問題