マルチプロセッシング
使用を処理します。つまり、マルチプロセッシングとプロセス間通信の代わりに、標準スレッドと共有メモリで並列処理を行うことができます。ここにあなたのコードのバージョンがあり、マルチプロセッシングの代わりにthreading.ThreadとQueue.Queueを使うよう調整されています。プロセスとマルチプロセッシング.Queue。これは、節約しないで、キューを介してnumpy ndarrayを渡します。私のコンピュータでは、これはあなたのコードより約3倍高速です。 (ただし、それは私がさらに下にいくつかの他のアプローチを示唆している。あなたのコードのシリアルバージョンよりも20%程度しか高速です。)
from threading import Thread
from Queue import Queue
import numpy as np
class __EndToken(object):
pass
def parallel_pipeline(buffer_size=50):
def parallel_pipeline_with_args(f):
def consumer(xs, q):
for x in xs:
q.put(x)
q.put(__EndToken())
def parallel_generator(f_xs):
q = Queue(buffer_size)
consumer_process = Thread(target=consumer,args=(f_xs,q,))
consumer_process.start()
while True:
x = q.get()
if isinstance(x, __EndToken):
break
yield x
def f_wrapper(xs):
return parallel_generator(f(xs))
return f_wrapper
return parallel_pipeline_with_args
@parallel_pipeline(3)
def f(xs):
for x in xs:
yield x + 1.0
@parallel_pipeline(3)
def g(xs):
for x in xs:
yield x * 3
@parallel_pipeline(3)
def h(xs):
for x in xs:
yield x * x
def xs():
for i in range(1000):
yield np.random.uniform(0,1,(500,2000))
rs = f(g(h(xs())))
%time print sum(r.sum() for r in rs) # 12.2s
ストアnumpyのアレイ共有メモリに
ものに近い別のオプション、あなたが要求したのは、マルチプロセッシングパッケージの使用を継続することですが、共有メモリに格納された配列を使ってプロセス間でデータを渡します。以下のコードは、それを行うための新しいArrayQueueクラスを作成します。 ArrayQueueオブジェクトは、サブプロセスを生成する前に作成する必要があります。これは、共有メモリにバックアップされたnumpy配列のプールを作成および管理します。結果配列がキューにプッシュされると、ArrayQueueはその配列のデータを既存の共有メモリ配列にコピーし、共有メモリ配列のidをキューに渡します。配列全体をキューに送るよりもはるかに高速です。なぜなら、配列をpickle化することを避けるからです。上記のスレッドバージョンと似たパフォーマンス(約10%遅い)があり、グローバルインタプリタロックが問題である場合(つまり、関数で多くのPythonコードを実行した場合)、スケーラビリティが向上する可能性があります。サンプルの代わりに機能
上記コードの
from multiprocessing import Process, Queue, Array
import numpy as np
class ArrayQueue(object):
def __init__(self, template, maxsize=0):
if type(template) is not np.ndarray:
raise ValueError('ArrayQueue(template, maxsize) must use a numpy.ndarray as the template.')
if maxsize == 0:
# this queue cannot be infinite, because it will be backed by real objects
raise ValueError('ArrayQueue(template, maxsize) must use a finite value for maxsize.')
# find the size and data type for the arrays
# note: every ndarray put on the queue must be this size
self.dtype = template.dtype
self.shape = template.shape
self.byte_count = len(template.data)
# make a pool of numpy arrays, each backed by shared memory,
# and create a queue to keep track of which ones are free
self.array_pool = [None] * maxsize
self.free_arrays = Queue(maxsize)
for i in range(maxsize):
buf = Array('c', self.byte_count, lock=False)
self.array_pool[i] = np.frombuffer(buf, dtype=self.dtype).reshape(self.shape)
self.free_arrays.put(i)
self.q = Queue(maxsize)
def put(self, item, *args, **kwargs):
if type(item) is np.ndarray:
if item.dtype == self.dtype and item.shape == self.shape and len(item.data)==self.byte_count:
# get the ID of an available shared-memory array
id = self.free_arrays.get()
# copy item to the shared-memory array
self.array_pool[id][:] = item
# put the array's id (not the whole array) onto the queue
new_item = id
else:
raise ValueError(
'ndarray does not match type or shape of template used to initialize ArrayQueue'
)
else:
# not an ndarray
# put the original item on the queue (as a tuple, so we know it's not an ID)
new_item = (item,)
self.q.put(new_item, *args, **kwargs)
def get(self, *args, **kwargs):
item = self.q.get(*args, **kwargs)
if type(item) is tuple:
# unpack the original item
return item[0]
else:
# item is the id of a shared-memory array
# copy the array
arr = self.array_pool[item].copy()
# put the shared-memory array back into the pool
self.free_arrays.put(item)
return arr
class __EndToken(object):
pass
def parallel_pipeline(buffer_size=50):
def parallel_pipeline_with_args(f):
def consumer(xs, q):
for x in xs:
q.put(x)
q.put(__EndToken())
def parallel_generator(f_xs):
q = ArrayQueue(template=np.zeros(0,1,(500,2000)), maxsize=buffer_size)
consumer_process = Process(target=consumer,args=(f_xs,q,))
consumer_process.start()
while True:
x = q.get()
if isinstance(x, __EndToken):
break
yield x
def f_wrapper(xs):
return parallel_generator(f(xs))
return f_wrapper
return parallel_pipeline_with_args
@parallel_pipeline(3)
def f(xs):
for x in xs:
yield x + 1.0
@parallel_pipeline(3)
def g(xs):
for x in xs:
yield x * 3
@parallel_pipeline(3)
def h(xs):
for x in xs:
yield x * x
def xs():
for i in range(1000):
yield np.random.uniform(0,1,(500,2000))
print "multiprocessing with shared-memory arrays:"
%time print sum(r.sum() for r in f(g(h(xs())))) # 13.5s
並列処理はシングルスレッドバージョン(以下に示すシリアルバージョン12.2S対14.8s)よりもわずかに約20%高速です。これは、各関数が単一のスレッドまたはプロセスで実行され、ほとんどの処理がxs()によって実行されるためです。上記の例の実行時間は、%time print sum(1 for x in xs())
を実行した場合とほぼ同じです。
あなたの実際のプロジェクトには、もっと多くの中間機能があり、かつ/またはそれらがあなたが示したものよりも複雑な場合は、ワークロードがプロセッサ間でより良く分散されていても問題ありません。ただし、ワークロードが実際に提供したコードに似ている場合は、コードをリファクタリングして、各スレッドに1つの関数を割り当てる代わりに、各スレッドに1つのサンプルを割り当てることができます。
import multiprocessing
import threading, Queue
import numpy as np
def f(x):
return x + 1.0
def g(x):
return x * 3
def h(x):
return x * x
def final(i):
return f(g(h(x(i))))
def final_sum(i):
return f(g(h(x(i)))).sum()
def x(i):
# produce sample number i
return np.random.uniform(0, 1, (500, 2000))
def rs_serial(func, n):
for i in range(n):
yield func(i)
def rs_parallel_threaded(func, n):
todo = range(n)
q = Queue.Queue(2*n_workers)
def worker():
while True:
try:
# the global interpreter lock ensures only one thread does this at a time
i = todo.pop()
q.put(func(i))
except IndexError:
# none left to do
q.put(None)
break
threads = []
for j in range(n_workers):
t = threading.Thread(target=worker)
t.daemon=False
threads.append(t) # in case it's needed later
t.start()
while True:
x = q.get()
if x is None:
break
else:
yield x
def rs_parallel_mp(func, n):
pool = multiprocessing.Pool(n_workers)
return pool.imap_unordered(func, range(n))
n_workers = 4
n_samples = 1000
print "serial:" # 14.8s
%time print sum(r.sum() for r in rs_serial(final, n_samples))
print "threaded:" # 10.1s
%time print sum(r.sum() for r in rs_parallel_threaded(final, n_samples))
print "mp return arrays:" # 19.6s
%time print sum(r.sum() for r in rs_parallel_mp(final, n_samples))
print "mp return results:" # 8.4s
%time print sum(r_sum for r_sum in rs_parallel_mp(final_sum, n_samples))
このコードのねじバージョンが速く私が与えた最初の例よりも若干のみであり、シリアルよりも速く、わずか約30%:これは以下のコードのようになり(スレッドおよびマルチバージョンの両方が示されています)バージョン。それは私が期待したほどのスピードアップではありません。おそらくPythonはGILによってまだ部分的に渋滞していますか?
主に、すべての関数が中間結果をキューイング(および酸洗)するのではなく、1つのプロセスで連鎖しているため、マルチプロセッシングのバージョンは元のマルチプロセッシングコードよりも大幅に高速です。しかし、すべての結果配列がimap_unorderedによって返される前に、(ワーカープロセスで)pickleされ、(メインプロセスで)unpickleされなければならないので、シリアルバージョンよりもまだ遅いです。ただし、パイプラインが配列全体ではなく集計結果を返すように配置することができれば、酸漬けオーバーヘッドを避けることができ、マルチプロセッシングバージョンは最速です。シリアルバージョンよりも約43%高速です。
ここでは完全性のために、上記のより細かい関数の代わりにオリジナルのジェネレータ関数でマルチプロセッシングを使用する2番目の例のバージョンを示します。これは、いくつかのトリックを使用して複数のプロセスにサンプルを分散させるため、多くのワークフローには適さない場合があります。しかし、ジェネレータを使用すると、より細かい関数を使用するよりも少し速いようですが、この方法では上記のシリアルバージョンと比べて最大54%のスピードアップが得られます。ただし、ワーカー関数から完全な配列を返す必要がない場合にのみ使用できます。
import multiprocessing, itertools, math
import numpy as np
def f(xs):
for x in xs:
yield x + 1.0
def g(xs):
for x in xs:
yield x * 3
def h(xs):
for x in xs:
yield x * x
def xs():
for i in range(1000):
yield np.random.uniform(0,1,(500,2000))
def final():
return f(g(h(xs())))
def final_sum():
for x in f(g(h(xs()))):
yield x.sum()
def get_chunk(args):
"""Retrieve n values (n=args[1]) from a generator function (f=args[0]) and return them as a list.
This runs in a worker process and does all the computation."""
return list(itertools.islice(args[0](), args[1]))
def parallelize(gen_func, max_items, n_workers=4, chunk_size=50):
"""Pull up to max_items items from several copies of gen_func, in small groups in parallel processes.
chunk_size should be big enough to improve efficiency (one copy of gen_func will be run for each chunk)
but small enough to avoid exhausting memory (each worker will keep chunk_size items in memory)."""
pool = multiprocessing.Pool(n_workers)
# how many chunks will be needed to yield at least max_items items?
n_chunks = int(math.ceil(float(max_items)/float(chunk_size)))
# generate a suitable series of arguments for get_chunk()
args_list = itertools.repeat((gen_func, chunk_size), n_chunks)
# chunk_gen will yield a series of chunks (lists of results) from the generator function,
# totaling n_chunks * chunk_size items (which is >= max_items)
chunk_gen = pool.imap_unordered(get_chunk, args_list)
# parallel_gen flattens the chunks, and yields individual items
parallel_gen = itertools.chain.from_iterable(chunk_gen)
# limit the output to max_items items
return itertools.islice(parallel_gen, max_items)
# in this case, the parallel version is slower than a single process, probably
# due to overhead of gathering numpy arrays in imap_unordered (via pickle?)
print "serial, return arrays:" # 15.3s
%time print sum(r.sum() for r in final())
print "parallel, return arrays:" # 24.2s
%time print sum(r.sum() for r in parallelize(final, max_items=1000))
# in this case, the parallel version is more than twice as fast as the single-thread version
print "serial, return result:" # 15.1s
%time print sum(r for r in final_sum())
print "parallel, return result:" # 6.8s
%time print sum(r for r in parallelize(final_sum, max_items=1000))
いくつかのコードを共有できますか? –
実際のコードではありません。ウィルも同様のことを模索するだろう。 –
それは[最小、完全で、検証可能な例](http://stackoverflow.com/help/mcve)... –