Pythonの3並行データローダーのためのきれいな、pythonicな方法?
私は本当にきれいで、ニシキヘビの同時データローダーがどのように見えるか知っていただきたいと思います。私は、メモリに完全に収まるほど大きすぎるデータの重い計算を行う私のプロジェクトにこのアプローチが必要です。したがって、同時に実行し、キューにデータを格納するデータローダーを実装したので、次のデータがロードされている間にメインプロセスが動作するようになりました(&)。もちろん、キューが空の場合(主プロセスは多くのアイテムを消費しようとする - >キューは新しいデータを待つ必要があります)、またはフル(ワーカープロセスは、メインプロセスがキューからデータを消費して、メモリエラー)。
私はPythonのmultiprocessing
モジュール(multiprocessing.Queue
とmultiprocessing.Process
)を使用して、この必要性を満たすためのクラスを作成しました。次のようにクラスの重要な部分は実装されています:
import multiprocessing as mp
from itertools import cycle
class ConcurrentLoader:
def __init__(path_to_data, queue_size, batch_size):
self._batch_size
self._path = path_to_data
filenames = ... # filenames for path 'path_to_data',
# get loaded using glob
self._files = cycle()
self._q = mp.Queue(queue_size)
...
self._worker = mp.Process(target=self._worker_func, daemon=True)
self._worker.start() # only started, never stopped
def _worker_func(self):
while True:
buffer = list()
for i in range(batch_size):
f = next(self._files)
... # load f and do some pre-processing with NumPy
... # add it to buffer
self._q.put(np.array(buffer).astype(np.float32))
def get_batch_data(self):
self._q.get()
クラスには、いくつかのより多くの方法がありますが、それらはすべて「便利機能」のためのものです。たとえば、各ファイルの読み込み頻度、データセット全体の読み込み回数などを計算しますが、これはPythonで実装するのが簡単で、計算時間を無駄にしません(set、dicts、..)。 )。
一方、I/Oと前処理のためにデータ部分自体は数秒かかる場合もあります。それが私がこれを同時に起こさせたい理由です。
ConcurrentLoader
べきである:get_batch_data
が呼ばれますが、キューが空である場合
- ブロックメインプロセスは、キューがいっぱいの場合、メモリ不足エラーを防止してから
while True
を防ぐためにリソースを浪費する - は、
ConcurrentLoader
を使用するすべてのクラスに対して透過的である必要があります。実際には同時に動作することに気づかずに、データのパスを指定してget_batch_data
を使用するだけです( メインプロセスが再び
は、これらの目標を(私は何かを忘れてしまった?)私は現在の実装を強化するために何をすべきかを考えると、リソースを解放するために死ぬと
ConcurrentLoader
は大体このセットアップをたどる使用するすべてのクラス:
class Foo:
...
def do_something(self):
...
data1 = ConcurrentLoader("path/to/data1", 64, 8)
data2 = ConcurrentLoader("path/to/data2", 256, 16)
...
sample1 = data1.get_batch_data()
sample2 = data2.get_batch_data()
... # heavy computations with data contained in 'sample1' & 'sample2'
# go *here*
私のアプローチを改善したり、独自の、クリーナー、より多くのニシキヘビのアプローチを供給するために、あらゆる種類のミスを指摘してくださいどちらか。
multiprocessing.Queue
がいっぱい/空であり、それは自動的に行わ上 get()
/put()
が呼び出されたときにブロッキング