マルチプロセッシングパッケージ(Amazon EC2のUbuntu 12.04でnumpy 1.7.0を使用するPython 2.73)を使用して、単純なnumpyベースの行列代数計算を並列実行中に、 。私のコードは、小さな行列サイズでうまく動作しますが、使用可能なメモリが豊富です。マルチプロセッシングを使用してサブプロセスを実行しているときにシステムエラーが発生しました
私が使用する行列のサイズはかなりです(私のコードは1000000x10の浮動小数点型の行列では問題なく動作しますが、1000000x500のものはクラッシュします。ところで、これらの行列をサブプロセスに/から渡しています)。 10 vs 500はランタイムパラメータです。他のものはすべて同じままです(入力データ、その他の実行時パラメータなど)
python3を使用して同じ(移植された)コードを実行しようとしました。サブプロセスは(Python 2.7のようにクラッシュする代わりに)スリープ/アイドルモードになり、プログラム/サブプロセスは何もしません。小さな行列の場合、コードはpython3でうまく動作します。
任意の提案は非常に(私はここのアイデアが不足しています)いただければ幸いです
エラーメッセージ:
Exception in thread Thread-5: Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run() File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
put(task) SystemError: NULL result without error in PyObject_Call
私が使用してマルチプロセッシングコード:以下
def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses):
if len(listOfInputs) == 0:
return
# Add result queue to the list of argument tuples.
resultQueue = mp.Manager().Queue()
listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs]
# Create and initialize the pool of workers.
pool = mp.Pool(processes = nParallelProcesses)
pool.map(proc, listOfInputsNew)
# Run the processes.
pool.close()
pool.join()
# Return the results.
return [resultQueue.get() for i in range(len(listOfInputs))]
は " proc "は各サブプロセスに対して実行されます。基本的には、numpyを使って多くの線形方程式系を解きほぐし(サブプロセス内で必要な行列を構築する)、その結果を別の行列として返します。もう一度、1つの実行時パラメータの値が小さい場合はうまく動作しますが、大きい場合はクラッシュします(またはpython3でハングします)。
def solveForLFV(param):
startTime = time.time()
(chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param
LFoutChunkSize = XY.shape[0]
nLFdim = LFVin.shape[1]
sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim))
LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim))
for LFVoutIndex in xrange(LFoutChunkSize):
LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex]
sumLFVinOuterProductLFVpurch[:, :] = 0.
LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize)
for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)):
LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :]
sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :])
LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :])
queue.put((chunkI, LFVoutChunk))
print 'solveForLFV: ', time.time() - startTime, 'sec'
sys.stdout.flush()
proc関数のコードを共有できますか? – barracel
ちょうどしました。私は、これらの引数のいくつかは行列であり、いくつかはリストのリストであり、いくつかは浮動小数点/整数です。 'queue'は各サブプロセスからの結果を返すために使われます。 – Yevgeny