2011-07-31 10 views
8

私はPythonでsubprocessモジュールを使用して標準入力を読み込み、標準出力をストリーミング形式で書き込むプロセスと通信しようとしています。私は、入力を生成するイテレータからサブプロセスを読み込み、サブプロセスから出力行を読みたいと思っています。入力ラインと出力ラインとの間には1対1の対応関係はない。どのように文字列を返す任意のイテレータからサブプロセスをフィードできますか?ここでPythonイテレータからサブプロセスの標準入力をどのように送ることができますか?

は、簡単なテストケースを与えるいくつかのサンプルコード、および何らかの理由で動作しない、私は試してみましたいくつかの方法である:

#!/usr/bin/python 
from subprocess import * 
# A really big iterator 
input_iterator = ("hello %s\n" % x for x in xrange(100000000)) 

# I thought that stdin could be any iterable, but it actually wants a 
# filehandle, so this fails with an error. 
subproc = Popen("cat", stdin=input_iterator, stdout=PIPE) 

# This works, but it first sends *all* the input at once, then returns 
# *all* the output as a string, rather than giving me an iterator over 
# the output. This uses up all my memory, because the input is several 
# hundred million lines. 
subproc = Popen("cat", stdin=PIPE, stdout=PIPE) 
output, error = subproc.communicate("".join(input_iterator)) 
output_lines = output.split("\n") 

それでは、どのように私は私のサブプロセスは、ANから読み取ることができますイテレータは行ごとに標準出力から読み込みますが、

答えて

5

簡単な方法は、入力プロセスをフォークして子プロセスから供給することです。誰もこれを行う可能性のある欠点について詳しく説明できますか?または、より簡単で安全なPythonモジュールがありますか? Pythonのイテレータからサブプロセスの標準入力を養うために

#!/usr/bin/python 
from subprocess import * 
import os 

def fork_and_input(input, handle): 
    """Send input to handle in a child process.""" 
    # Make sure input is iterable before forking 
    input = iter(input) 
    if os.fork(): 
     # Parent 
     handle.close() 
    else: 
     # Child 
     try: 
      handle.writelines(input) 
      handle.close() 
     # An IOError here means some *other* part of the program 
     # crashed, so don't complain here. 
     except IOError: 
      pass 
     os._exit() 

# A really big iterator 
input_iterator = ("hello %s\n" % x for x in xrange(100000000)) 

subproc = Popen("cat", stdin=PIPE, stdout=PIPE) 
fork_and_input(input_iterator, subproc.stdin) 

for line in subproc.stdout: 
    print line, 
+1

子プロセスで 'exit()'を実行すると、 'SystemExit'が呼び出されます。代わりに['os._exit(0)'](https://docs.python.org/2/library/os.html#os._exit)を使うべきです。 – hakanc

+1

['os.fork () '](http://stackoverflow.com/a/32331150/4279)を参照してください。 'os.fork()'の可能性のある問題の例を以下に示します:[標準ライブラリのロックはforkでサニタイズする必要があります](http://bugs.python.org/issue6721) – jfs

0

フォローthis recipeこれは非同期I/Oをサポートするサブプロセスのアドオンです。ただし、サブプロセスが各入力行または行のグループに出力の一部で応答する必要があります。

+1

プログラムが入力行ごとに出力を生成するとは保証できません。実際には、おそらくそうではありません。 –

+0

申し訳ありませんが、私は正確ではありませんでした:あなたのメインプロセスは、サブプロセスに十分なインプットを供給して、何らかのアウトプットを生成し、このアウトプットを読んで、サブプロセスにいくつかのインプットを送り、ループ。この場合、私のリンクが指し示すレシピがあなたを助けるかもしれません。要点は、すべての入力を見る前に、サブプロセスが出力の生成を開始できることです。 –

+0

Hmm。私のパイプラインには(オプションによって)ソートステップがある可能性があるので、すべての入力を受け取るまで、ほとんどの出力を生成しないでしょう。 –

2

#!/usr/bin/env python3 
from subprocess import Popen, PIPE 

with Popen("sink", stdin=PIPE, bufsize=-1) as process: 
    for chunk in input_iterator: 
     process.stdin.write(chunk) 

あなたが同時に出力を読みたいなら、あなたはthreadsやasync.ioが必要になります。

#!/usr/bin/env python3 
import asyncio 
import sys 
from asyncio.subprocess import PIPE 
from contextlib import closing 

async def writelines(writer, lines): 
    # NOTE: can't use writer.writelines(lines) here because it tries to write 
    # all at once 
    with closing(writer): 
     for line in lines: 
      writer.write(line) 
      await writer.drain() 

async def main(): 
    input_iterator = (b"hello %d\n" % x for x in range(100000000)) 
    process = await asyncio.create_subprocess_exec("cat", stdin=PIPE, stdout=PIPE) 
    asyncio.ensure_future(writelines(process.stdin, input_iterator)) 
    async for line in process.stdout: 
     sys.stdout.buffer.write(line) 
    return await process.wait() 

if sys.platform == 'win32': 
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows 
    asyncio.set_event_loop(loop) 
else: 
    loop = asyncio.get_event_loop() 
with closing(loop): 
    sys.exit(loop.run_until_complete(main())) 
関連する問題