0

私は状態ベクトルのスナップショットを保存し、異なるパラメータに対して計算を実行するシミュレーションを行っています。私はスキャンするための2つの制御パラメータを持っています(この例ではpとa)。したがって、2つの次元が2つの制御パラメータ用である1つのnetCDF4ファイルのシミュレーション結果を保存します。 1つのパラメータセットのシミュレーションを実行すると、ファイルは正しく保存されますが、apply_asyncmultiprocessingから実行しようとすると、プロセスの最後にnetCDF4がアクセスできなくなります。netCDF4ファイルにパラレル保存するPythonマルチプロセッシング

私の完全なコードは、このgithub repositoryであるが、基本的に私は何をしようとしているこれです:少なくとも二つの可能なアプローチがあり

import multiprocessing as mp 
import time as timer 
import netCDF4 
import numpy as np 
def run_sim_for_p_a(p,a,pstep,astep,step,max_time,u0,fname): 
    time_ar=np.arange(0,max_time,step) 
    u = np.ones((len(time_ar),1024)) 
    u[0]=u0 
    print "Calculating for p,a:",p,a 
    for i,t in enumerate(time_ar[1:]): 
     u[i+1] = u[i]*np.cos(t)*np.sin(a)*np.sin(p) 
    for tstep,t in enumerate(time_ar): 
     save_p_a_snapshot(fname,pstep,astep,tstep,p,a,t,u[tstep]) # save the results into the netCDF4 file 

def apply_async_and_save_grid(pmin,pmax,fname, 
           Np=10,Na=10, 
           step=None,max_time=500.0,numproc=10): 
    start = timer.time() 
    setup_p_a_scan(fname) # Setup a netCDF4 file for the simulations 
    if step is None: 
     step=max_time 
    p_range = np.linspace(pmin,pmax,Np) 
    init = np.random.random((1024)) 
    a_range = np.linspace(0,1,Na) 
    availble_cpus = int(available_cpu_count() - 2) 
    numproc=min(numproc,availble_cpus) 
    print "Using",numproc," processors" 
    pool = mp.Pool(processes=numproc) 
    for i,p in enumerate(p_range): 
     for j,a in enumerate(a_range): 
      pool.apply_async(run_sim_for_p_a, 
          args = (p,a,i,j,step,max_time,init,fname)) 
    pool.close() 
    pool.join() 
    print "Took ",timer.time()-start 
if __name__=="__main__": 
    apply_async_and_save_grid(1.0,2.0,"test",Np=2,Na=4,step=1.0,max_time=10) 
+1

複数のプロセスが非同期で実行されていますが、同じディスクファイルにアクセスしていません。このトーク([並行処理に関するKeynote](https://www.youtube.com/watch?v=9zinZmE3Ogk) – wwii

+0

はい、プロセス間で通信は行われませんが、結果は同じディスクファイルに保存されます。 – Ohm

答えて

1

は、あなたは、各ワーカー・プロセスの書き込みを持つことができますその結果を独自のnetCDF4ファイルに書き込んで、すべての作業者が終了した後でメインプログラムをマージします。

私はnetCDF形式に精通していませんが、そのようなファイルに追加することができると仮定すると、apply_asyncを開始する前にmultiprocessing.Lockを作成する可能性があります。 このロックは、ワーカープロセス のパラメータに追加する必要があります。ワーカープロセスは、ロックをacquireにして、netcdfファイルを開いて書き込み、閉じます。それでは、ロックをreleaseする必要があります。これにより、一度に1つのプロセスだけがnetCDFファイルに書き込まれます。

編集PoolLockを処理する方法にthis questionに対する回答を参照してください。

+0

優れています。私はワーカーのプールを使用しているので、マネージャまたはグローバルロックのいずれかを追加する必要があったので、このスレッドを参照しました - https://stackoverflow.com/questions/25557686/python-sharing-a-プロセス間ロック – Ohm

+0

@Ohm Excellent find!いくつかの微妙なw.r.tがあります。私が実現しなかった「プール」と「ロック」。私は一致する私の答えを更新しました。 –

関連する問題