2017-12-20 11 views
0

Pythonの3並行データローダーのためのきれいな、pythonicな方法?

私は本当にきれいで、ニシキヘビの同時データローダーがどのように見えるか知っていただきたいと思います。私は、メモリに完全に収まるほど大きすぎるデータの重い計算を行う私のプロジェクトにこのアプローチが必要です。したがって、同時に実行し、キューにデータを格納するデータローダーを実装したので、次のデータがロードされている間にメインプロセスが動作するようになりました(&)。もちろん、キューが空の場合(主プロセスは多くのアイテムを消費しようとする - >キューは新しいデータを待つ必要があります)、またはフル(ワーカープロセスは、メインプロセスがキューからデータを消費して、メモリエラー)。

私はPythonのmultiprocessingモジュール(multiprocessing.Queuemultiprocessing.Process)を使用して、この必要性を満たすためのクラスを作成しました。次のようにクラスの重要な部分は実装されています:

import multiprocessing as mp 
from itertools import cycle  

class ConcurrentLoader: 
    def __init__(path_to_data, queue_size, batch_size): 
     self._batch_size 
     self._path = path_to_data 
     filenames = ... # filenames for path 'path_to_data', 
         # get loaded using glob 
     self._files = cycle() 
     self._q = mp.Queue(queue_size) 
     ... 
     self._worker = mp.Process(target=self._worker_func, daemon=True) 
     self._worker.start() # only started, never stopped 

    def _worker_func(self): 
     while True: 
      buffer = list() 
      for i in range(batch_size): 
       f = next(self._files) 
       ... # load f and do some pre-processing with NumPy 
       ... # add it to buffer 
      self._q.put(np.array(buffer).astype(np.float32)) 

    def get_batch_data(self): 
     self._q.get() 

クラスには、いくつかのより多くの方法がありますが、それらはすべて「便利機能」のためのものです。たとえば、各ファイルの読み込み頻度、データセット全体の読み込み回数などを計算しますが、これはPythonで実装するのが簡単で、計算時間を無駄にしません(set、dicts、..)。 )。

一方、I/Oと前処理のためにデータ部分自体は数秒かかる場合もあります。それが私がこれを同時に起こさせたい理由です。

ConcurrentLoaderべきである:get_batch_dataが呼ばれますが、キューが空である場合

  • ブロックワーカープロセス:

    • ブロックメインプロセスは、キューがいっぱいの場合、メモリ不足エラーを防止してからwhile Trueを防ぐためにリソースを浪費する
    • は、ConcurrentLoaderを使用するすべてのクラスに対して透過的である必要があります。実際には同時に動作することに気づかずに、データのパスを指定してget_batch_dataを使用するだけです(
    • メインプロセスが再び

    は、これらの目標を(私は何かを忘れてしまった?)私は現在の実装を強化するために何をすべきかを考えると、リソースを解放するために死ぬと

  • は、その労働者を終了しますか?スレッド/デッドロックは安全ですか?もっと「ピジョンソニック」な実装方法がありますか?もっときれいにすることはできますか?何とかリソースを浪費していますか?

    ConcurrentLoaderは大体このセットアップをたどる使用するすべてのクラス:

    class Foo: 
        ... 
    
        def do_something(self): 
         ... 
         data1 = ConcurrentLoader("path/to/data1", 64, 8) 
         data2 = ConcurrentLoader("path/to/data2", 256, 16) 
         ... 
         sample1 = data1.get_batch_data() 
         sample2 = data2.get_batch_data() 
         ... # heavy computations with data contained in 'sample1' & 'sample2' 
          # go *here* 
    

    私のアプローチを改善したり、独自の、クリーナー、より多くのニシキヘビのアプローチを供給するために、あらゆる種類のミスを指摘してくださいどちらか。

    multiprocessing.Queueがいっぱい/空であり、それは自動的に行わ上 get()/put()が呼び出されたときにブロッキング
  • 答えて

    1
    • この動作は、呼び出し関数には透過的です。

    • self._worker.start()使用前self._worker.daemon = Trueメインプロセスは

    を終了すると作業者(s)が自動的に殺されます
    関連する問題