2013-02-27 56 views
8

マルチプロセッシングパッケージ(Amazon EC2のUbuntu 12.04でnumpy 1.7.0を使用するPython 2.73)を使用して、単純なnumpyベースの行列代数計算を並列実行中に、 。私のコードは、小さな行列サイズでうまく動作しますが、使用可能なメモリが豊富です。マルチプロセッシングを使用してサブプロセスを実行しているときにシステムエラーが発生しました

私が使用する行列のサイズはかなりです(私のコードは1000000x10の浮動小数点型の行列では問題なく動作しますが、1000000x500のものはクラッシュします。ところで、これらの行列をサブプロセスに/から渡しています)。 10 vs 500はランタイムパラメータです。他のものはすべて同じままです(入力データ、その他の実行時パラメータなど)

python3を使用して同じ(移植された)コードを実行しようとしました。サブプロセスは(Python 2.7のようにクラッシュする代わりに)スリープ/アイドルモードになり、プログラム/サブプロセスは何もしません。小さな行列の場合、コードはpython3でうまく動作します。

任意の提案は非常に(私はここのアイデアが不足しています)いただければ幸いです

エラーメッセージ:

Exception in thread Thread-5: Traceback (most recent call last): 
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner 
    self.run() File "/usr/lib/python2.7/threading.py", line 504, in run 
    self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks 
    put(task) SystemError: NULL result without error in PyObject_Call 

私が使用してマルチプロセッシングコード:以下

def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses): 
    if len(listOfInputs) == 0: 
     return 
    # Add result queue to the list of argument tuples. 
    resultQueue = mp.Manager().Queue() 
    listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs] 
    # Create and initialize the pool of workers. 
    pool = mp.Pool(processes = nParallelProcesses) 
    pool.map(proc, listOfInputsNew) 
    # Run the processes. 
    pool.close() 
    pool.join() 
    # Return the results. 
    return [resultQueue.get() for i in range(len(listOfInputs))] 

は " proc "は各サブプロセスに対して実行されます。基本的には、numpyを使って多くの線形方程式系を解きほぐし(サブプロセス内で必要な行列を構築する)、その結果を別の行列として返します。もう一度、1つの実行時パラメータの値が小さい場合はうまく動作しますが、大きい場合はクラッシュします(またはpython3でハングします)。

def solveForLFV(param): 
    startTime = time.time() 
    (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param 
    LFoutChunkSize = XY.shape[0] 
    nLFdim = LFVin.shape[1] 
    sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim)) 
    LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim)) 
    for LFVoutIndex in xrange(LFoutChunkSize): 
     LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex] 
     sumLFVinOuterProductLFVpurch[:, :] = 0. 
     LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize) 
     for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)): 
      LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :] 
      sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :]) 
     LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :]) 
    queue.put((chunkI, LFVoutChunk)) 
    print 'solveForLFV: ', time.time() - startTime, 'sec' 
    sys.stdout.flush() 
+0

proc関数のコードを共有できますか? – barracel

+0

ちょうどしました。私は、これらの引数のいくつかは行列であり、いくつかはリストのリストであり、いくつかは浮動小数点/整数です。 'queue'は各サブプロセスからの結果を返すために使われます。 – Yevgeny

答えて

5

500,000,000はかなり大きく、float64を使用している場合は40億バイト、つまり約4GBです。 (10,000,000の浮動小数点配列は8000万バイト、つまり約80MBになります。)問題は、パイプを介してサブプロセスに送るために配列をピックアップしようとするマルチプロセッシングと関係していると思います。

Unixプラットフォームを使用しているので、fork()(マルチプロセッシングのワーカーの作成に使用)のメモリ継承動作を利用することで、この動作を回避できます。私はこのハック(裂けたthis project)で大きな成功を収めました。

### A helper for letting the forked processes use data without pickling. 
_data_name_cands = (
    '_data_' + ''.join(random.sample(string.ascii_lowercase, 10)) 
    for _ in itertools.count()) 
class ForkedData(object): 
    ''' 
    Class used to pass data to child processes in multiprocessing without 
    really pickling/unpickling it. Only works on POSIX. 

    Intended use: 
     - The master process makes the data somehow, and does e.g. 
      data = ForkedData(the_value) 
     - The master makes sure to keep a reference to the ForkedData object 
      until the children are all done with it, since the global reference 
      is deleted to avoid memory leaks when the ForkedData object dies. 
     - Master process constructs a multiprocessing.Pool *after* 
      the ForkedData construction, so that the forked processes 
      inherit the new global. 
     - Master calls e.g. pool.map with data as an argument. 
     - Child gets the real value through data.value, and uses it read-only. 
    ''' 
    # TODO: does data really need to be used read-only? don't think so... 
    # TODO: more flexible garbage collection options 
    def __init__(self, val): 
        g = globals() 
        self.name = next(n for n in _data_name_cands if n not in g) 
        g[self.name] = val 
        self.master_pid = os.getpid() 

    @property 
    def value(self): 
        return globals()[self.name] 

    def __del__(self): 
        if os.getpid() == self.master_pid: 
            del globals()[self.name] 
関連する問題