2016-07-29 4 views
10

私は、プロデューサコンシューマパイプラインの一部として、読み取り専用numpy配列をキューに入れているマルチプロセッシングジョブを持っています。ファストキューの読み取り専用numpy配列

現在のところ、これはデフォルト動作のmultiprocessing.Queueであるため、パフォーマンスが低下します。

アレイを酸洗するのではなく、共有メモリへの参照を渡すことはできませんか?

残念ながら、コンシューマの起動後に配列が生成されているため、その周囲に簡単な方法はありません。 (だからグローバル変数アプローチは醜いだろう...)。

[次のコードでは、h(x0)とh(x1)が並行して計算されるとは考えていません。代わりに、私たちは、h(X0)とg(CPUにおけるパイプラインのような)並列に計算さ(H(×1))。]参照

from multiprocessing import Process, Queue 
import numpy as np 

class __EndToken(object): 
    pass 

def parrallel_pipeline(buffer_size=50): 
    def parrallel_pipeline_with_args(f): 
     def consumer(xs, q): 
      for x in xs: 
       q.put(x) 
      q.put(__EndToken()) 

     def parallel_generator(f_xs): 
      q = Queue(buffer_size) 
      consumer_process = Process(target=consumer,args=(f_xs,q,)) 
      consumer_process.start() 
      while True: 
       x = q.get() 
       if isinstance(x, __EndToken): 
        break 
       yield x 

     def f_wrapper(xs): 
      return parallel_generator(f(xs)) 

     return f_wrapper 
    return parrallel_pipeline_with_args 


@parrallel_pipeline(3) 
def f(xs): 
    for x in xs: 
     yield x + 1.0 

@parrallel_pipeline(3) 
def g(xs): 
    for x in xs: 
     yield x * 3 

@parrallel_pipeline(3) 
def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

if __name__ == "__main__": 
    rs = f(g(h(xs()))) 
    for r in rs: 
     print r 
+0

いくつかのコードを共有できますか? –

+0

実際のコードではありません。ウィルも同様のことを模索するだろう。 –

+0

それは[最小、完全で、検証可能な例](http://stackoverflow.com/help/mcve)... –

答えて

11

スレッド間でメモリを共有するか、スレッドの代わりに、あなたがnumpyのを使っているので、あなたがそのthe global interpreter lock is released during numpy computations事実を利用することができます

マルチプロセッシング

使用を処理します。つまり、マルチプロセッシングとプロセス間通信の代わりに、標準スレッドと共有メモリで並列処理を行うことができます。ここにあなたのコードのバージョンがあり、マルチプロセッシングの代わりにthreading.ThreadとQueue.Queueを使うよう調整されています。プロセスとマルチプロセッシング.Queue。これは、節約しないで、キューを介してnumpy ndarrayを渡します。私のコンピュータでは、これはあなたのコードより約3倍高速です。 (ただし、それは私がさらに下にいくつかの他のアプローチを示唆している。あなたのコードのシリアルバージョンよりも20%程度しか高速です。)

from threading import Thread 
from Queue import Queue 
import numpy as np 

class __EndToken(object): 
    pass 

def parallel_pipeline(buffer_size=50): 
    def parallel_pipeline_with_args(f): 
     def consumer(xs, q): 
      for x in xs: 
       q.put(x) 
      q.put(__EndToken()) 

     def parallel_generator(f_xs): 
      q = Queue(buffer_size) 
      consumer_process = Thread(target=consumer,args=(f_xs,q,)) 
      consumer_process.start() 
      while True: 
       x = q.get() 
       if isinstance(x, __EndToken): 
        break 
       yield x 

     def f_wrapper(xs): 
      return parallel_generator(f(xs)) 

     return f_wrapper 
    return parallel_pipeline_with_args 

@parallel_pipeline(3) 
def f(xs): 
    for x in xs: 
     yield x + 1.0 

@parallel_pipeline(3) 
def g(xs): 
    for x in xs: 
     yield x * 3 

@parallel_pipeline(3) 
def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

rs = f(g(h(xs()))) 
%time print sum(r.sum() for r in rs) # 12.2s 

ストアnumpyのアレイ共有メモリに

ものに近い別のオプション、あなたが要求したのは、マルチプロセッシングパッケージの使用を継続することですが、共有メモリに格納された配列を使ってプロセス間でデータを渡します。以下のコードは、それを行うための新しいArrayQueueクラスを作成します。 ArrayQueueオブジェクトは、サブプロセスを生成する前に作成する必要があります。これは、共有メモリにバックアップされたnumpy配列のプールを作成および管理します。結果配列がキューにプッシュされると、ArrayQueueはその配列のデータを既存の共有メモリ配列にコピーし、共有メモリ配列のidをキューに渡します。配列全体をキューに送るよりもはるかに高速です。なぜなら、配列をpickle化することを避けるからです。上記のスレッドバージョンと似たパフォーマンス(約10%遅い)があり、グローバルインタプリタロックが問題である場合(つまり、関数で多くのPythonコードを実行した場合)、スケーラビリティが向上する可能性があります。サンプルの代わりに機能

上記コードの

from multiprocessing import Process, Queue, Array 
import numpy as np 

class ArrayQueue(object): 
    def __init__(self, template, maxsize=0): 
     if type(template) is not np.ndarray: 
      raise ValueError('ArrayQueue(template, maxsize) must use a numpy.ndarray as the template.') 
     if maxsize == 0: 
      # this queue cannot be infinite, because it will be backed by real objects 
      raise ValueError('ArrayQueue(template, maxsize) must use a finite value for maxsize.') 

     # find the size and data type for the arrays 
     # note: every ndarray put on the queue must be this size 
     self.dtype = template.dtype 
     self.shape = template.shape 
     self.byte_count = len(template.data) 

     # make a pool of numpy arrays, each backed by shared memory, 
     # and create a queue to keep track of which ones are free 
     self.array_pool = [None] * maxsize 
     self.free_arrays = Queue(maxsize) 
     for i in range(maxsize): 
      buf = Array('c', self.byte_count, lock=False) 
      self.array_pool[i] = np.frombuffer(buf, dtype=self.dtype).reshape(self.shape) 
      self.free_arrays.put(i) 

     self.q = Queue(maxsize) 

    def put(self, item, *args, **kwargs): 
     if type(item) is np.ndarray: 
      if item.dtype == self.dtype and item.shape == self.shape and len(item.data)==self.byte_count: 
       # get the ID of an available shared-memory array 
       id = self.free_arrays.get() 
       # copy item to the shared-memory array 
       self.array_pool[id][:] = item 
       # put the array's id (not the whole array) onto the queue 
       new_item = id 
      else: 
       raise ValueError(
        'ndarray does not match type or shape of template used to initialize ArrayQueue' 
       ) 
     else: 
      # not an ndarray 
      # put the original item on the queue (as a tuple, so we know it's not an ID) 
      new_item = (item,) 
     self.q.put(new_item, *args, **kwargs) 

    def get(self, *args, **kwargs): 
     item = self.q.get(*args, **kwargs) 
     if type(item) is tuple: 
      # unpack the original item 
      return item[0] 
     else: 
      # item is the id of a shared-memory array 
      # copy the array 
      arr = self.array_pool[item].copy() 
      # put the shared-memory array back into the pool 
      self.free_arrays.put(item) 
      return arr 

class __EndToken(object): 
    pass 

def parallel_pipeline(buffer_size=50): 
    def parallel_pipeline_with_args(f): 
     def consumer(xs, q): 
      for x in xs: 
       q.put(x) 
      q.put(__EndToken()) 

     def parallel_generator(f_xs): 
      q = ArrayQueue(template=np.zeros(0,1,(500,2000)), maxsize=buffer_size) 
      consumer_process = Process(target=consumer,args=(f_xs,q,)) 
      consumer_process.start() 
      while True: 
       x = q.get() 
       if isinstance(x, __EndToken): 
        break 
       yield x 

     def f_wrapper(xs): 
      return parallel_generator(f(xs)) 

     return f_wrapper 
    return parallel_pipeline_with_args 


@parallel_pipeline(3) 
def f(xs): 
    for x in xs: 
     yield x + 1.0 

@parallel_pipeline(3) 
def g(xs): 
    for x in xs: 
     yield x * 3 

@parallel_pipeline(3) 
def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

print "multiprocessing with shared-memory arrays:" 
%time print sum(r.sum() for r in f(g(h(xs())))) # 13.5s 

並列処理はシングルスレッドバージョン(以下に示すシリアルバージョン12.2S対14.8s)よりもわずかに約20%高速です。これは、各関数が単一のスレッドまたはプロセスで実行され、ほとんどの処理がxs()によって実行されるためです。上記の例の実行時間は、%time print sum(1 for x in xs())を実行した場合とほぼ同じです。

あなたの実際のプロジェクトには、もっと多くの中間機能があり、かつ/またはそれらがあなたが示したものよりも複雑な場合は、ワークロードがプロセッサ間でより良く分散されていても問題ありません。ただし、ワークロードが実際に提供したコードに似ている場合は、コードをリファクタリングして、各スレッドに1つの関数を割り当てる代わりに、各スレッドに1つのサンプルを割り当てることができます。

import multiprocessing 
import threading, Queue 
import numpy as np 

def f(x): 
    return x + 1.0 

def g(x): 
    return x * 3 

def h(x): 
    return x * x 

def final(i): 
    return f(g(h(x(i)))) 

def final_sum(i): 
    return f(g(h(x(i)))).sum() 

def x(i): 
    # produce sample number i 
    return np.random.uniform(0, 1, (500, 2000)) 

def rs_serial(func, n): 
    for i in range(n): 
     yield func(i) 

def rs_parallel_threaded(func, n): 
    todo = range(n) 
    q = Queue.Queue(2*n_workers) 

    def worker(): 
     while True: 
      try: 
       # the global interpreter lock ensures only one thread does this at a time 
       i = todo.pop() 
       q.put(func(i)) 
      except IndexError: 
       # none left to do 
       q.put(None) 
       break 

    threads = [] 
    for j in range(n_workers): 
     t = threading.Thread(target=worker) 
     t.daemon=False 
     threads.append(t) # in case it's needed later 
     t.start() 

    while True: 
     x = q.get() 
     if x is None: 
      break 
     else: 
      yield x 

def rs_parallel_mp(func, n): 
    pool = multiprocessing.Pool(n_workers) 
    return pool.imap_unordered(func, range(n)) 

n_workers = 4 
n_samples = 1000 

print "serial:" # 14.8s 
%time print sum(r.sum() for r in rs_serial(final, n_samples)) 
print "threaded:" # 10.1s 
%time print sum(r.sum() for r in rs_parallel_threaded(final, n_samples)) 

print "mp return arrays:" # 19.6s 
%time print sum(r.sum() for r in rs_parallel_mp(final, n_samples)) 
print "mp return results:" # 8.4s 
%time print sum(r_sum for r_sum in rs_parallel_mp(final_sum, n_samples)) 

このコードのねじバージョンが速く私が与えた最初の例よりも若干のみであり、シリアルよりも速く、わずか約30%:これは以下のコードのようになり(スレッドおよびマルチバージョンの両方が示されています)バージョン。それは私が期待したほどのスピードアップではありません。おそらくPythonはGILによってまだ部分的に渋滞していますか?

主に、すべての関数が中間結果をキューイング(および酸洗)するのではなく、1つのプロセスで連鎖しているため、マルチプロセッシングのバージョンは元のマルチプロセッシングコードよりも大幅に高速です。しかし、すべての結果配列がimap_unorderedによって返される前に、(ワーカープロセスで)pickleされ、(メインプロセスで)unpickleされなければならないので、シリアルバージョンよりもまだ遅いです。ただし、パイプラインが配列全体ではなく集計結果を返すように配置することができれば、酸漬けオーバーヘッドを避けることができ、マルチプロセッシングバージョンは最速です。シリアルバージョンよりも約43%高速です。

ここでは完全性のために、上記のより細かい関数の代わりにオリジナルのジェネレータ関数でマルチプロセッシングを使用する2番目の例のバージョンを示します。これは、いくつかのトリックを使用して複数のプロセスにサンプルを分散させるため、多くのワークフローには適さない場合があります。しかし、ジェネレータを使用すると、より細かい関数を使用するよりも少し速いようですが、この方法では上記のシリアルバージョンと比べて最大54%のスピードアップが得られます。ただし、ワーカー関数から完全な配列を返す必要がない場合にのみ使用できます。

import multiprocessing, itertools, math 
import numpy as np 

def f(xs): 
    for x in xs: 
     yield x + 1.0 

def g(xs): 
    for x in xs: 
     yield x * 3 

def h(xs): 
    for x in xs: 
     yield x * x 

def xs(): 
    for i in range(1000): 
     yield np.random.uniform(0,1,(500,2000)) 

def final(): 
    return f(g(h(xs()))) 

def final_sum(): 
    for x in f(g(h(xs()))): 
     yield x.sum() 

def get_chunk(args): 
    """Retrieve n values (n=args[1]) from a generator function (f=args[0]) and return them as a list. 
    This runs in a worker process and does all the computation.""" 
    return list(itertools.islice(args[0](), args[1])) 

def parallelize(gen_func, max_items, n_workers=4, chunk_size=50): 
    """Pull up to max_items items from several copies of gen_func, in small groups in parallel processes. 
    chunk_size should be big enough to improve efficiency (one copy of gen_func will be run for each chunk) 
    but small enough to avoid exhausting memory (each worker will keep chunk_size items in memory).""" 

    pool = multiprocessing.Pool(n_workers) 

    # how many chunks will be needed to yield at least max_items items? 
    n_chunks = int(math.ceil(float(max_items)/float(chunk_size))) 

    # generate a suitable series of arguments for get_chunk() 
    args_list = itertools.repeat((gen_func, chunk_size), n_chunks) 

    # chunk_gen will yield a series of chunks (lists of results) from the generator function, 
    # totaling n_chunks * chunk_size items (which is >= max_items) 
    chunk_gen = pool.imap_unordered(get_chunk, args_list) 

    # parallel_gen flattens the chunks, and yields individual items 
    parallel_gen = itertools.chain.from_iterable(chunk_gen) 

    # limit the output to max_items items 
    return itertools.islice(parallel_gen, max_items) 


# in this case, the parallel version is slower than a single process, probably 
# due to overhead of gathering numpy arrays in imap_unordered (via pickle?) 
print "serial, return arrays:" # 15.3s 
%time print sum(r.sum() for r in final()) 
print "parallel, return arrays:" # 24.2s 
%time print sum(r.sum() for r in parallelize(final, max_items=1000)) 


# in this case, the parallel version is more than twice as fast as the single-thread version 
print "serial, return result:" # 15.1s 
%time print sum(r for r in final_sum()) 
print "parallel, return result:" # 6.8s 
%time print sum(r for r in parallelize(final_sum, max_items=1000)) 

+0

これの最初の文は私が知らなかった非常に有用なものです。 +1 –

0
あなたの例では、それが持っているかもしれないが、私のコンピュータ上で実行していないよう

私が窓を走っているという事実と関係がある(__main__名前空間(何も装飾されていない)には何も漬けません)...これは何か助けになりますか? (f()、g()、h()のそれぞれの中にパックを入れて解凍する必要があります)

注*これは実際にはもっと速くなるとは思わない...ちょうど他人示唆している。..

from multiprocessing import Process, freeze_support 
from multiprocessing.sharedctypes import Value, Array 
import numpy as np 

def package(arr): 
    shape = Array('i', arr.shape, lock=False) 

    if arr.dtype == float: 
     ctype = Value('c', b'd') #d for double #f for single 
    if arr.dtype == int: 
     ctype = Value('c', b'i') #if statements could be avoided if data is always the same 
    data = Array(ctype.value, arr.reshape(-1),lock=False) 

    return data, shape 

def unpack(data, shape): 
    return np.array(data[:]).reshape(shape[:]) 

#test 
def f(args): 
    print(unpack(*args)) 

if __name__ == '__main__': 
    freeze_support() 

    a = np.array([1,2,3,4,5]) 
    a_packed = package(a) 
    print('array has been packaged') 

    p = Process(target=f, args=(a_packed,)) 
    print('passing to parallel process') 
    p.start() 

    print('joining to parent process') 
    p.join() 
    print('finished') 
0

酸洗上の標準multiprocessing依存を回避する、Pathos-multiprocessing projectをチェックしてください。これにより、酸漬けの非効率性を回避し、読み取り専用の共有リソース用の共通メモリにアクセスできるようになります。 Pathosはフルピップパッケージでの展開に近づいていますが、その間にインストールすることをお勧めします。pip install git+https://github.com/uqfoundation/pathos

関連する問題