パイプとblackmagicが必要ですが、Pythonには両方があります。
from multiprocessing import Process, Pipe
def F1(stream):
for x in stream:
yield str(x)+'a'
def F2(stream):
for x in stream:
yield x+'b'
def F3(stream):
for x in stream:
yield x+'c'
def F4(stream):
for x in stream:
yield x+'d'
class PIPE_EOF:
pass
class IterableConnection(object):
def __init__(self, pipe):
self.pipe = pipe
def __iter__(self):
return self
def __next__(self):
try:
ret = self.pipe.recv()
if ret == PIPE_EOF:
raise StopIteration
return ret
except EOFError:
raise StopIteration
def next(self):
return self.__next__()
def parallel_generator_chain(*args, **kwargs):
if 'data' in kwargs:
data = kwargs['data']
else:
raise RuntimeError('Missing "data" argument.')
def decorator(func, _input, _output):
def wrapper(*args, **kwargs):
for item in func(_input):
_output.send(item)
_output.send(PIPE_EOF)
return wrapper
for func in args:
in_end, out_end = Pipe(duplex = False)
in_end = IterableConnection(in_end)
func = decorator(func, data, out_end)
p = Process(target = func)
p.start()
data = in_end
for output in data:
yield output
if 'xrange' not in globals():
xrange = range
if __name__ == '__main__':
for x in parallel_generator_chain(xrange, F1, F2, F3, F4, data=100000000):
print(x)
#for x in F4(F3(F2(F1(range(1000000))))):
# print(x)
注:私は、http://stackoverflow.com/questions/5684992/how-can-i-parallelize-a-pipeline-of-generators-iterators-in-python見てきましたが、私の実際の使用'' F1''、 'F2''、' F3'、 'F4'は実際に値を処理する際にいくつかの状態を実際に累積します。 –
誰かが完全なコードを見たいと思っているのであれば、http://pit-claudel.fr/clement/blog/an-experimental-estimation-of-the-entropy-of-english-in-50-lines-ofにあります。 -python-code/ –
状態が累積すると、それらを並列化することも可能ですか? – BrenBarn