残念ながら、コンシューマの起動後に配列が生成されているため、その周囲に簡単な方法はありません。 (だからグローバル変数アプローチは醜いだろう...)。


from multiprocessing import Process, Queue 
import numpy as np 

class __EndToken(object): 

def parrallel_pipeline(buffer_size=50): 
    def parrallel_pipeline_with_args(f): 
     def consumer(xs, q): 
      for x in xs: 

     def parallel_generator(f_xs): 
      q = Queue(buffer_size) 
      consumer_process = Process(target=consumer,args=(f_xs,q,)) 
      while True: 
       x = q.get() 
       if isinstance(x, __EndToken): 
       yield x 

     def f_wrapper(xs): 
      return parallel_generator(f(xs)) 

     return f_wrapper 
    return parrallel_pipeline_with_args 

def f(xs): 
    for x in xs: 
     yield x + 1.0 

def g(xs): 
    for x in xs: 
     yield x * 3 

def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

if __name__ == "__main__": 
    rs = f(g(h(xs()))) 
    for r in rs: 
     print r 

いくつかのコードを共有できますか? –


実際のコードではありません。ウィルも同様のことを模索するだろう。 –


それは[最小、完全で、検証可能な例](http://stackoverflow.com/help/mcve)... –



スレッド間でメモリを共有するか、スレッドの代わりに、あなたがnumpyのを使っているので、あなたがそのthe global interpreter lock is released during numpy computations事実を利用することができます


使用を処理します。つまり、マルチプロセッシングとプロセス間通信の代わりに、標準スレッドと共有メモリで並列処理を行うことができます。ここにあなたのコードのバージョンがあり、マルチプロセッシングの代わりにthreading.ThreadとQueue.Queueを使うよう調整されています。プロセスとマルチプロセッシング.Queue。これは、節約しないで、キューを介してnumpy ndarrayを渡します。私のコンピュータでは、これはあなたのコードより約3倍高速です。 (ただし、それは私がさらに下にいくつかの他のアプローチを示唆している。あなたのコードのシリアルバージョンよりも20%程度しか高速です。)

from threading import Thread 
from Queue import Queue 
import numpy as np 

class __EndToken(object): 

def parallel_pipeline(buffer_size=50): 
    def parallel_pipeline_with_args(f): 
     def consumer(xs, q): 
      for x in xs: 

     def parallel_generator(f_xs): 
      q = Queue(buffer_size) 
      consumer_process = Thread(target=consumer,args=(f_xs,q,)) 
      while True: 
       x = q.get() 
       if isinstance(x, __EndToken): 
       yield x 

     def f_wrapper(xs): 
      return parallel_generator(f(xs)) 

     return f_wrapper 
    return parallel_pipeline_with_args 

def f(xs): 
    for x in xs: 
     yield x + 1.0 

def g(xs): 
    for x in xs: 
     yield x * 3 

def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

rs = f(g(h(xs()))) 
%time print sum(r.sum() for r in rs) # 12.2s 


ものに近い別のオプション、あなたが要求したのは、マルチプロセッシングパッケージの使用を継続することですが、共有メモリに格納された配列を使ってプロセス間でデータを渡します。以下のコードは、それを行うための新しいArrayQueueクラスを作成します。 ArrayQueueオブジェクトは、サブプロセスを生成する前に作成する必要があります。これは、共有メモリにバックアップされたnumpy配列のプールを作成および管理します。結果配列がキューにプッシュされると、ArrayQueueはその配列のデータを既存の共有メモリ配列にコピーし、共有メモリ配列のidをキューに渡します。配列全体をキューに送るよりもはるかに高速です。なぜなら、配列をpickle化することを避けるからです。上記のスレッドバージョンと似たパフォーマンス(約10%遅い)があり、グローバルインタプリタロックが問題である場合(つまり、関数で多くのPythonコードを実行した場合)、スケーラビリティが向上する可能性があります。サンプルの代わりに機能


from multiprocessing import Process, Queue, Array 
import numpy as np 

class ArrayQueue(object): 
    def __init__(self, template, maxsize=0): 
     if type(template) is not np.ndarray: 
      raise ValueError('ArrayQueue(template, maxsize) must use a numpy.ndarray as the template.') 
     if maxsize == 0: 
      # this queue cannot be infinite, because it will be backed by real objects 
      raise ValueError('ArrayQueue(template, maxsize) must use a finite value for maxsize.') 

     # find the size and data type for the arrays 
     # note: every ndarray put on the queue must be this size 
     self.dtype = template.dtype 
     self.shape = template.shape 
     self.byte_count = len(template.data) 

     # make a pool of numpy arrays, each backed by shared memory, 
     # and create a queue to keep track of which ones are free 
     self.array_pool = [None] * maxsize 
     self.free_arrays = Queue(maxsize) 
     for i in range(maxsize): 
      buf = Array('c', self.byte_count, lock=False) 
      self.array_pool[i] = np.frombuffer(buf, dtype=self.dtype).reshape(self.shape) 

     self.q = Queue(maxsize) 

    def put(self, item, *args, **kwargs): 
     if type(item) is np.ndarray: 
      if item.dtype == self.dtype and item.shape == self.shape and len(item.data)==self.byte_count: 
       # get the ID of an available shared-memory array 
       id = self.free_arrays.get() 
       # copy item to the shared-memory array 
       self.array_pool[id][:] = item 
       # put the array's id (not the whole array) onto the queue 
       new_item = id 
       raise ValueError(
        'ndarray does not match type or shape of template used to initialize ArrayQueue' 
      # not an ndarray 
      # put the original item on the queue (as a tuple, so we know it's not an ID) 
      new_item = (item,) 
     self.q.put(new_item, *args, **kwargs) 

    def get(self, *args, **kwargs): 
     item = self.q.get(*args, **kwargs) 
     if type(item) is tuple: 
      # unpack the original item 
      return item[0] 
      # item is the id of a shared-memory array 
      # copy the array 
      arr = self.array_pool[item].copy() 
      # put the shared-memory array back into the pool 
      return arr 

class __EndToken(object): 

def parallel_pipeline(buffer_size=50): 
    def parallel_pipeline_with_args(f): 
     def consumer(xs, q): 
      for x in xs: 

     def parallel_generator(f_xs): 
      q = ArrayQueue(template=np.zeros(0,1,(500,2000)), maxsize=buffer_size) 
      consumer_process = Process(target=consumer,args=(f_xs,q,)) 
      while True: 
       x = q.get() 
       if isinstance(x, __EndToken): 
       yield x 

     def f_wrapper(xs): 
      return parallel_generator(f(xs)) 

     return f_wrapper 
    return parallel_pipeline_with_args 

def f(xs): 
    for x in xs: 
     yield x + 1.0 

def g(xs): 
    for x in xs: 
     yield x * 3 

def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

print "multiprocessing with shared-memory arrays:" 
%time print sum(r.sum() for r in f(g(h(xs())))) # 13.5s 

並列処理はシングルスレッドバージョン(以下に示すシリアルバージョン12.2S対14.8s)よりもわずかに約20%高速です。これは、各関数が単一のスレッドまたはプロセスで実行され、ほとんどの処理がxs()によって実行されるためです。上記の例の実行時間は、%time print sum(1 for x in xs())を実行した場合とほぼ同じです。


import multiprocessing 
import threading, Queue 
import numpy as np 

def f(x): 
    return x + 1.0 

def g(x): 
    return x * 3 

def h(x): 
    return x * x 

def final(i): 
    return f(g(h(x(i)))) 

def final_sum(i): 
    return f(g(h(x(i)))).sum() 

def x(i): 
    # produce sample number i 
    return np.random.uniform(0, 1, (500, 2000)) 

def rs_serial(func, n): 
    for i in range(n): 
     yield func(i) 

def rs_parallel_threaded(func, n): 
    todo = range(n) 
    q = Queue.Queue(2*n_workers) 

    def worker(): 
     while True: 
       # the global interpreter lock ensures only one thread does this at a time 
       i = todo.pop() 
      except IndexError: 
       # none left to do 

    threads = [] 
    for j in range(n_workers): 
     t = threading.Thread(target=worker) 
     threads.append(t) # in case it's needed later 

    while True: 
     x = q.get() 
     if x is None: 
      yield x 

def rs_parallel_mp(func, n): 
    pool = multiprocessing.Pool(n_workers) 
    return pool.imap_unordered(func, range(n)) 

n_workers = 4 
n_samples = 1000 

print "serial:" # 14.8s 
%time print sum(r.sum() for r in rs_serial(final, n_samples)) 
print "threaded:" # 10.1s 
%time print sum(r.sum() for r in rs_parallel_threaded(final, n_samples)) 

print "mp return arrays:" # 19.6s 
%time print sum(r.sum() for r in rs_parallel_mp(final, n_samples)) 
print "mp return results:" # 8.4s 
%time print sum(r_sum for r_sum in rs_parallel_mp(final_sum, n_samples)) 




import multiprocessing, itertools, math 
import numpy as np 

def f(xs): 
    for x in xs: 
     yield x + 1.0 

def g(xs): 
    for x in xs: 
     yield x * 3 

def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

def final(): 
    return f(g(h(xs()))) 

def final_sum(): 
    for x in f(g(h(xs()))): 
     yield x.sum() 

def get_chunk(args): 
    """Retrieve n values (n=args[1]) from a generator function (f=args[0]) and return them as a list. 
    This runs in a worker process and does all the computation.""" 
    return list(itertools.islice(args[0](), args[1])) 

def parallelize(gen_func, max_items, n_workers=4, chunk_size=50): 
    """Pull up to max_items items from several copies of gen_func, in small groups in parallel processes. 
    chunk_size should be big enough to improve efficiency (one copy of gen_func will be run for each chunk) 
    but small enough to avoid exhausting memory (each worker will keep chunk_size items in memory).""" 

    pool = multiprocessing.Pool(n_workers) 

    # how many chunks will be needed to yield at least max_items items? 
    n_chunks = int(math.ceil(float(max_items)/float(chunk_size))) 

    # generate a suitable series of arguments for get_chunk() 
    args_list = itertools.repeat((gen_func, chunk_size), n_chunks) 

    # chunk_gen will yield a series of chunks (lists of results) from the generator function, 
    # totaling n_chunks * chunk_size items (which is >= max_items) 
    chunk_gen = pool.imap_unordered(get_chunk, args_list) 

    # parallel_gen flattens the chunks, and yields individual items 
    parallel_gen = itertools.chain.from_iterable(chunk_gen) 

    # limit the output to max_items items 
    return itertools.islice(parallel_gen, max_items) 

# in this case, the parallel version is slower than a single process, probably 
# due to overhead of gathering numpy arrays in imap_unordered (via pickle?) 
print "serial, return arrays:" # 15.3s 
%time print sum(r.sum() for r in final()) 
print "parallel, return arrays:" # 24.2s 
%time print sum(r.sum() for r in parallelize(final, max_items=1000)) 

# in this case, the parallel version is more than twice as fast as the single-thread version 
print "serial, return result:" # 15.1s 
%time print sum(r for r in final_sum()) 
print "parallel, return result:" # 6.8s 
%time print sum(r for r in parallelize(final_sum, max_items=1000)) 


これの最初の文は私が知らなかった非常に有用なものです。 +1 –


私が窓を走っているという事実と関係がある(__main__名前空間(何も装飾されていない)には何も漬けません)...これは何か助けになりますか? (f()、g()、h()のそれぞれの中にパックを入れて解凍する必要があります)


from multiprocessing import Process, freeze_support 
from multiprocessing.sharedctypes import Value, Array 
import numpy as np 

def package(arr): 
    shape = Array('i', arr.shape, lock=False) 

    if arr.dtype == float: 
     ctype = Value('c', b'd') #d for double #f for single 
    if arr.dtype == int: 
     ctype = Value('c', b'i') #if statements could be avoided if data is always the same 
    data = Array(ctype.value, arr.reshape(-1),lock=False) 

    return data, shape 

def unpack(data, shape): 
    return np.array(data[:]).reshape(shape[:]) 

def f(args): 

if __name__ == '__main__': 

    a = np.array([1,2,3,4,5]) 
    a_packed = package(a) 
    print('array has been packaged') 

    p = Process(target=f, args=(a_packed,)) 
    print('passing to parallel process') 

    print('joining to parent process') 

酸洗上の標準multiprocessing依存を回避する、Pathos-multiprocessing projectをチェックしてください。これにより、酸漬けの非効率性を回避し、読み取り専用の共有リソース用の共通メモリにアクセスできるようになります。 Pathosはフルピップパッケージでの展開に近づいていますが、その間にインストールすることをお勧めします。pip install git+https://github.com/uqfoundation/pathos
