2016-12-30 5 views
0

実行するタスクの数がコアの数よりも多い場合は、コアにジョブを割り当てるMPIのpythonスクリプト(オンラインで見つけた)があります。MPI-Pythonの各コアからの出力を収集し、.npyファイルとして保存する方法は?

from mpi4py import MPI 

import SubProgram 
import numpy as np 

Output={}; 
def enum(*sequential, **named): 

    enums = dict(zip(sequential, range(len(sequential))), **named) 
    return type('Enum',(), enums) 

# Define MPI message tags 
tags = enum('READY', 'DONE', 'EXIT', 'START') 

# Initializations and preliminaries 
comm = MPI.COMM_WORLD # get MPI communicator object 
size = comm.size  # total number of processes 
rank = comm.rank  # rank of this process 
status = MPI.Status() # get MPI status object 


if rank == 0: 
    # Master process executes code below 
    tasks = range(2) 
    task_index = 0 
    num_workers = size - 1 
    closed_workers = 0 
    print("Master starting with %d workers" % num_workers) 
    while closed_workers < num_workers: 
     data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) 

     source = status.Get_source() 
     tag = status.Get_tag() 
     #print(source,tag) 
     if tag == tags.READY: 
      # Worker is ready, so send it a task 
      if task_index < len(tasks): 
       comm.send(tasks[task_index], dest=source, tag=tags.START) 
       print("Sending task %d to worker %d" % (task_index, source)) 
       task_index += 1 
      else: 
       comm.send(None, dest=source, tag=tags.EXIT) 
     elif tag == tags.DONE: 
      results = data 
      print("Got data from worker %d" % source) 
     elif tag == tags.EXIT: 
      print("Worker %d exited." % source) 
      closed_workers += 1 

    print("Master finishing") 
else: 
    # Worker processes execute code below 
    name = MPI.Get_processor_name() 
    print("I am a worker with rank %d on %s." % (rank, name)) 
    while True: 

     comm.send(None, dest=0, tag=tags.READY) 
     task = comm.recv(source=0, tag=MPI.ANY_SOURCE, status=status) 
     tag = status.Get_tag() 



     if tag == tags.START: 
      #calling the subprogram and the subprogram returns a list 
      Output[rank]=SubProgram.main(rank,name); 

      NewOutput=comm.gather(Gene,root=0) 

      result = task**2 
      comm.send(result, dest=0, tag=tags.DONE) 
     elif tag == tags.EXIT: 
      break 

    comm.send(None, dest=0, tag=tags.EXIT) 
if rank==0: 
    np.save('Output.npy',NewOutput) 

サブプログラムは、配列のリストを返す関数であり、各コアは固有の配列を返します。このようなもの: -

[array([ 1, 9613, 13037, 6789], dtype=int64), 
array([ 95, 5648, ..., 2387, 6790], dtype=int64), 
array([ 509, 1948, 2541, 2075], dtype=int64), 
array([ 594, 12091], dtype=int64), 
array([ 786, 4370, 8830, 5002, 5948, 11969], dtype=int64), 
array([ 841, 4324, 9761, 7397, 6367], dtype=int64)] 

したがって、10コアで実行すると1つのリストが得られます。しかし、上記のプログラムを使用すると、結果は.npyに保存されません。 私は並列プログラミングには新しいです。誰でも進める方法を教えてください。

{0:[array([ 1, 9613, 13037, 6789], dtype=int64), 
array([ 95, 5648, ..., 2387, 6790], dtype=int64), 
array([ 509, 1948, 2541, 2075], dtype=int64), 
array([ 594, 12091], dtype=int64), 
array([ 786, 4370, 8830, 5002, 5948, 11969], dtype=int64), 
array([ 841, 4324, 9761, 7397, 6367], dtype=int64)], 


1: [array([ 1, 843, 4665, ..., 9613, 13037, 6789], dtype=int64), 
array([ 95, 1939, 5648, ..., 2387, 5920, 6790], dtype=int64), 
array([ 509, 1948, 2541, 5417, 2421, 11452, 12863, 2075], dtype=int64), 
array([ 594, 3364, 12081, 7746, 2286, 9719, 12091], dtype=int64), 
array([ 786, 4370, 8830, 5002, 5948, 11969], dtype=int64), 
array([ 841, 4324, 9761, 7389, 7697, 6367], dtype=int64)], 


2: [array([ 1, 843, 4665, ..., 9613, 13037, 6789], dtype=int64), 
array([ 95, 1939, 5648, ..., 2387, 5920, 6790], dtype=int64), 
array([ 509, 1948, 2541, 5417, 2421, 11452, 12863, 2075], dtype=int64), 
array([ 594, 3364, 12081, 7746, 2286, 9719, 12091], dtype=int64), 
array([ 786, 4370, 8830, 5002, 5948, 11969], dtype=int64), 
array([ 841, 4324, 9761, 7389, 7697, 6367], dtype=int64)] 
................................. 

10:[array([10,63,89........],dtype=int64)]}   ] 

はあなたのすべてをありがとう:

などについて辞書としてこれらの出力を収集し、それを格納する方法はあります。

答えて

1

私はそれは

 elif tag == tags.DONE: 

      print("Got data from worker %d" % source) 

      if source not in results: 
       results[source] = [] 
      results[source].append(data) 

全コード

を使用してコードを取り、変数 resultsに私にこの

{ 
1: [array([0]), array([16]), array([49])], 
2: [array([9]), array([36]), array([81])], 
3: [array([1])], 
4: [array([4]), array([25]), array([64])] 
} 

のようなものを与える作業例を作成するために

Output[rank]=SubProgram.main(rank,name); 
NewOutput=comm.gather(Gene,root=0) 

を削除

from mpi4py import MPI 
import numpy as np 

def enum(*sequential, **named): 
    enums = dict(zip(sequential, range(len(sequential))), **named) 
    return type('Enum',(), enums) 

# Define MPI message tags 
tags = enum('READY', 'DONE', 'EXIT', 'START') 

# Initializations and preliminaries 
comm = MPI.COMM_WORLD # get MPI communicator object 
size = comm.size  # total number of processes 
rank = comm.rank  # rank of this process 
status = MPI.Status() # get MPI status object 

if rank == 0: 
    # Master process executes code below 
    tasks = range(10) 
    task_index = 0 

    num_workers = size - 1 
    closed_workers = 0 

    results = dict() 

    print("Master starting with %d workers" % num_workers) 

    while closed_workers < num_workers: 

     data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) 
     source = status.Get_source() 
     tag = status.Get_tag() 

     #print(source, tag) 

     if tag == tags.READY: 

      # Worker is ready, so send it a task 
      if task_index < len(tasks): 
       comm.send(tasks[task_index], dest=source, tag=tags.START) 
       print("Sending task %d to worker %d" % (task_index, source)) 
       task_index += 1 
      else: 
       comm.send(None, dest=source, tag=tags.EXIT) 

     elif tag == tags.DONE: 

      print("Got data from worker %d" % source) 
      if source not in results: 
       results[source] = [] 
      results[source].append(data) 

     elif tag == tags.EXIT: 

      print("Worker %d exited." % source) 
      closed_workers += 1 

    print("Master finishing") 

    print(results) 

else: 
    # Worker processes execute code below 
    name = MPI.Get_processor_name() 

    print("I am a worker with rank %d on %s." % (rank, name)) 

    while True: 

     comm.send(None, dest=0, tag=tags.READY) 

     data = comm.recv(source=0, tag=MPI.ANY_SOURCE, status=status) 
     #source = status.Get_source() 
     tag = status.Get_tag() 

     #print(source, tag) 

     if tag == tags.START: 

      # create some example array 
      result = np.array([data])**2 

      comm.send(result, dest=0, tag=tags.DONE) 

     elif tag == tags.EXIT: 

      comm.send(None, dest=0, tag=tags.EXIT) 
      break 
+0

ありがとうございます。ほんとうにありがとう。 – RG20

関連する問題