2013-12-20 5 views
8

が想定ジェネレータのシーケンスを並列化:私はこのようになりパイソンストリーム処理コードを有する

def F1(stream): 
    for x in stream: 
     yield f1(x) 

def F2(stream): 
    for x in stream: 
     yield f2(x) 

def F3(stream): 
    for x in stream: 
     yield f3(x) 

def F4(stream): 
    for x in stream: 
     yield f4(x) 


for x in F4(F3(F2(F1(range(1000000))))): 
    print(x) 

これは、UNIX(rangeコマンドを想定)にrange 1000000 | F1 | F2 | F3 | F4とほぼ同等であるが、中にUnixの各工程におけるパイプは平行して走る。

Pythonコードを簡単に並列化する方法はありますか?

+0

注:私は、http://stackoverflow.com/questions/5684992/how-can-i-parallelize-a-pipeline-of-generators-iterators-in-python見てきましたが、私の実際の使用'' F1''、 'F2''、' F3'、 'F4'は実際に値を処理する際にいくつかの状態を実際に累積します。 –

+0

誰かが完全なコードを見たいと思っているのであれば、http://pit-claudel.fr/clement/blog/an-experimental-estimation-of-the-entropy-of-english-in-50-lines-ofにあります。 -python-code/ –

+1

状態が累積すると、それらを並列化することも可能ですか? – BrenBarn

答えて

3

パイプとblackmagicが必要ですが、Pythonには両方があります。

from multiprocessing import Process, Pipe 


def F1(stream): 
    for x in stream: 
     yield str(x)+'a' 

def F2(stream): 
    for x in stream: 
     yield x+'b' 

def F3(stream): 
    for x in stream: 
     yield x+'c' 

def F4(stream): 
    for x in stream: 
     yield x+'d' 



class PIPE_EOF: 
    pass 

class IterableConnection(object): 
    def __init__(self, pipe): 
     self.pipe = pipe 

    def __iter__(self): 
     return self 

    def __next__(self): 
     try: 
      ret = self.pipe.recv() 
      if ret == PIPE_EOF: 
       raise StopIteration 
      return ret 
     except EOFError: 
      raise StopIteration 

    def next(self): 
     return self.__next__() 


def parallel_generator_chain(*args, **kwargs): 
    if 'data' in kwargs: 
     data = kwargs['data'] 
    else: 
     raise RuntimeError('Missing "data" argument.') 

    def decorator(func, _input, _output): 
     def wrapper(*args, **kwargs): 
      for item in func(_input): 
       _output.send(item) 
      _output.send(PIPE_EOF) 
     return wrapper 

    for func in args: 
     in_end, out_end = Pipe(duplex = False) 
     in_end = IterableConnection(in_end) 
     func = decorator(func, data, out_end) 
     p = Process(target = func) 
     p.start() 
     data = in_end 

    for output in data: 
     yield output 



if 'xrange' not in globals(): 
    xrange = range 


if __name__ == '__main__': 
    for x in parallel_generator_chain(xrange, F1, F2, F3, F4, data=100000000): 
     print(x) 

#for x in F4(F3(F2(F1(range(1000000))))): 
# print(x) 
+0

遅れてご返事ありがとうございます!あなたのコードを動作させようとしていましたが、私は管理しませんでした(Python 3.3)。あなたの 'decorator'関数は何も返しませんし、コードは何の出力も生成しません。私はデコレータに戻りましたが、_pickle.PicklingError:pickle :属性ルックアップbuiltins.function failed'を取得しました。コードは適切に実行されましたか?再度、感謝します。 –

+0

あなたは正しいです。私は作業コードより概念の証明のように書いたので、テストしませんでした。しかし、私は実際のバージョンで自分の投稿を編集しました。それはPython 2と3の両方で動作するはずです。 – smeso

+0

私はまだ "_pickle.PicklingError:pickle を取得します:属性lookup builtins.function failed" thingy on Windows; Unix上でそれを試して、あなたの投稿を保ちます。 –

関連する問題