2012-03-21 19 views
9

サブプロセスを使用して呼び出す実行可能ファイルがあります。次に、後で別のスレッドに取り込まれるQueueからその値を読み取るスレッドを使用して、stdin経由でいくつかのデータを供給するつもりです。出力は、別のスレッドのstdoutパイプを使用して読み取られ、キュー内でソートされる必要があります。python:スレッド内のサブプロセス出力を読み取る

私の以前の研究からわかるように、Queueでスレッドを使用するのは良い方法です。

外部実行可能ファイルは、残念ながら、パイプラインされているすべての行に対してすぐに答えられないので、単純な書き込み、readlineサイクルはオプションではありません。実行可能ファイルはいくつかの内部マルチスレッディングを実装しています。使用できるようになるとすぐに出力が必要なので、追加のリーダースレッドが必要になります。ただ、各ライン(shuffleline.py)をシャッフルします実行ファイルをテストするための一例として、

#!/usr/bin/python -u 
import sys 
from random import shuffle 

for line in sys.stdin: 
    line = line.strip() 

    # shuffle line 
    line = list(line) 
    shuffle(line) 
    line = "".join(line) 

    sys.stdout.write("%s\n"%(line)) 
    sys.stdout.flush() # avoid buffers 

これはすでに、できるだけバッファリングされないことに注意してください。それとも?

#!/usr/bin/python -u 
import sys 
import Queue 
import threading 
import subprocess 

class WriteThread(threading.Thread): 
    def __init__(self, p_in, source_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_in 
     self.source_queue = source_queue 

    def run(self): 
     while True: 
      source = self.source_queue.get() 
      print "writing to process: ", repr(source) 
      self.pipe.write(source) 
      self.pipe.flush() 
      self.source_queue.task_done() 

class ReadThread(threading.Thread): 
    def __init__(self, p_out, target_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_out 
     self.target_queue = target_queue 

    def run(self): 
     while True: 
      line = self.pipe.readline() # blocking read 
      if line == '': 
       break 
      print "reader read: ", line.rstrip() 
      self.target_queue.put(line) 

if __name__ == "__main__": 

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered 
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE) 

    source_queue = Queue.Queue() 
    target_queue = Queue.Queue() 

    writer = WriteThread(proc.stdin, source_queue) 
    writer.setDaemon(True) 
    writer.start() 

    reader = ReadThread(proc.stdout, target_queue) 
    reader.setDaemon(True) 
    reader.start() 

    # populate queue 
    for i in range(10): 
     source_queue.put("string %s\n" %i) 
    source_queue.put("") 

    print "source_queue empty: ", source_queue.empty() 
    print "target_queue empty: ", target_queue.empty() 

    import time 
    time.sleep(2) # expect some output from reader thread 

    source_queue.join() # wait until all items in source_queue are processed 
    proc.stdin.close() # should end the subprocess 
    proc.wait() 

この次の出力(python2.7)を得た:

writing to process: 'string 0\n' 
writing to process: 'string 1\n' 
writing to process: 'string 2\n' 
writing to process: 'string 3\n' 
writing to process: 'string 4\n' 
writing to process: 'string 5\n' 
writing to process: 'string 6\n' 
source_queue empty: writing to process: 'string 7\n' 
writing to process: 'string 8\n' 
writing to process: 'string 9\n' 
writing to process: '' 
True 
target_queue empty: True 

、その後2秒間何も...

reader read: rgsn0i t 
reader read: nrg1sti 
reader read: tis n2rg 
reader read: snt gri3 
reader read: nsri4 tg 
reader read: stir5 gn 
reader read: gnri6ts 
reader read: ngrits7 
reader read: 8nsrt ig 
reader read: sg9 nitr 

インターリーブこれは私のストリップダウンテストプログラムであります当初は期待されています。ただし、の後にサブプロセスの出力はまで表示されず、サブプロセスは終了します。より多くの行がパイプされて出力が得られるので、stdoutパイプにキャッシュ問題があると想定します。ここに掲載されている他の質問によると、(少なくともサブプロセスで)標準出力をフラッシュすることは、少なくともLinuxではうまくいくはずです。あなたがない限り

答えて

7

あなたの問題は、で開始するためのスレッドを使用するよりもさらに悪いsubprocessモジュールを何の関係も、またはスレッド(そのまま問題)、あるいは混合サブプロセスとスレッド(非常に悪い考えを持っていませんあなたがcode.google.com/p/python-subprocess32から得ることができるのPython 3.2のサブプロセスモジュールのバックポート)を使用したり、print文がそうであるように(複数のスレッドから同じものをアクセスする。)

何が起こるかというとそのごshuffleline.pyプログラムバッファです。出力ではなく入力にあります。それはあまり明白ではありませんが、ファイルオブジェクトを反復処理すると、Pythonはブロック(通常は8kバイト)で読み込みます。

for line in sys.stdin: 
    line = line.strip() 
    .... 

あなたがこれを行う、のいずれか(EOFのため''返す)sys.stdin.readline()を呼び出すためにwhileループを使用しないようにしたい場合は::

sys.stdinはFileObjectにあるので、あなたの forループはEOFまたはフルブロックまでバッファリングします
while True: 
    line = sys.stdin.readline() 
    if not line: 
     break 
    line = line.strip() 
    ... 

又は第二引数(「センチネル」)が返されるまで、最初の引数を呼び出すイテレータを作成iter()の2つの引数の形式、使用:

サブスレッドのパイプではなく、ブロッキングI/Oではなく、プロセスや他のものを一緒にフックする方法がたくさんあるtwisted.reactor.spawnProcessのようなものであっても、スレッドを使用しないことを提案しても、消費者と生産者として。

+0

ありがとう、それは解決策です! – muckl

+1

サブプロセスとスレッドの混在がなぜそんなに恐ろしいアプローチなのか尋ねてもよろしいですか?何も起こっていない間に何度も何度もノンブロッキングI/Oを呼び出すよりもエレガントなようです。明らかに、スレッドはスレッドセーフではないデータ構造にアクセスすべきではありませんが、キューからの読み取りと書き込みは安全です。 Python3.2バックポートの変更は、私のような単純なケースにとって重要ですか? – muckl

+3

スレッドとサブプロセスの問題は、スレッドとforkを混在させる問題です。 http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-themおよび他のそのような記事を参照してください。 Python 3.2サブプロセスバックポートは、これらの問題を回避します。スレッドの一般的な主な問題は、制御やデバッグが難しいことです。たとえば、スレッドの「外側」からそれらを削除することはできません。したがって、スレッドが読み取りまたは書き込みに固執している場合は、何もできません。 –

関連する問題