2012-03-29 4 views
0

私のMPIプログラムは、他のプロセスから0個以上のメッセージを送受信する多くのプロセスで構成されています。プロセスは、メッセージを処理できるかどうかを定期的にチェックします。コードは3000回まで繰り返し実行されます。その後、デッドロックやプログラムのフリーズが発生します。お気軽にご意見をお寄せください。私の疑似コードは以下の通りです。何か質問があれば教えてください。より高い反復ステップでデッドロックを回避するための提案が必要

Nは処理ノードの数である:

do{ 

    if(numberIterations>1) -- Receive Data 
    { 
     getdata: 
     MPI_Iprobe() 
     while(flagprobe !=0) 
     { 
      If(TAG=StausUpdate) 
       Update status of processor; 
      If(TAG=Data) 
       Process Data; 
      MPI_Iprobe() 
     } 
    } 

    if(numberIterations< MaxIterations) -- Send Data 
    { 
     for(i=0;i<N;i++) 
      MPI_Bsend_init(request[i]) 

     for(i=0;i<N;i++) 
      MPI_Start(request[i]) 

     numberIterations++; 
    } 

    if(numberIterations == MaxIterations) -- Update Processor Status 
    { 
     for(i=0;i<N;i++) 
      MPI_Isend(request1[i]) -- with TAG = StatusUpdate 

     goto getdata; 
     set endloopflag = 1 
    } 

    if(numberIterations == MaxIterations && endloopflag ==1) --Final Check 
    { 
     for(i=0;i<N;i++) 
      MPI_Test(request1[i],flagtest); 
     if(!flagtest) 
      goto getdata; 
    } 

} while(numberIterations < MaxIterations); 

for(i=0;i<N;i++) --Free request 
{ 
    MPI_Request_free(&request[i]); 
} 

--- MPI_Startの私の知識が少しさびが、それが対にされるべきではないマーク

#include <string.h> 
#include <math.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <time.h> 
#include <iostream> 
#include <fstream> 
#include "mpi.h" 
#define N 9      //# of nodes 
#define M 10      //samples number 
#define n 2      //demension of weight vector 
#define TAU 0.15 
#define DISTANCE 0.1    //measuremeant for two nodes 
#define A 0.2      //learning rate 
#define ITERATION_STEPS 1000  // Program goes for ITERATION_STEPS - 1 
#define SAMPLE_STEP 1    //Number of current iteration 
#define BT1 0.17 
#define BT2 0.02 
#define A0 0.9     //initial learning rate 
#define AC 0.05     //middle learning rate 
#define AF 0.001     //final learning rate 
#define TC 4500      //first period of iteration 
#define TF 5000     //second period of iteration 

#define BUFSIZE 400000 

using namespace std; 

void printtime(double comm_time,double update_time,string filename,int rank); 
int checkack(int ack[],int status[]); 
int checkstatus(int status[],int procid); 
int noof_activeproc(int status[],int myrank); 
void printresult(double w[][n],string filename,int rank); 
void plot(double w[][n], char* fileName); 
void update(double w[][n], double x[], int t,int rank, int g[][9]); 
double norm(double a[], double b[]); 
double p(double sample[], double w[], int t); 
void OneToTwo (int index, int *row, int *col); 
int g(int b, int j); 

int main(int argc, char *argv[]) 
{ 
     int rank,size;; 
     MPI_Init(&argc,&argv); 
     MPI_Comm_size(MPI_COMM_WORLD, &size); 
     MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
     MPI_Status statusprobe,status,status1[N],status2[N]; 
     MPI_Request request[N],request1[N],request2[N],request3[N]; //N request for N process per iteration 
     double buf[BUFSIZE]; // buffer for the outgoing message; 
     int procstatus[N],ack[N]; //store the process status and ack 
     double temp[n]; 
     double tempsend[n]; 
     ifstream in1, in2, in3; 
     ofstream out,outtime; 
     int i, j,k,z,req = 0,req1; 
     int checklocation=0; //bookmark for MPI_Test  
     int checklocation1=0; //bookmark for MPI_Test  
     int numberIterations; 
     double samples[M][n]; //for all samples 
     double w[N][n]; //for all node weight 
     double x[n]; //one sample 
     int g[9][9]; 
     int count=n; 
     int flagprobe=0; 
     int flagtest=1; 
     int flagrecv=0; 
     int datareadflag; // flag that checks wheter the data is read or not 
     double dataincount; 
     int checktestflag; 
     int requestfreeflag=0; 
     int flag=0; //test flag 
     int flagtest1=0; // check for the request to update the processor status 
     int endloopflag=0; 
     int *bptr, bl; 

     double start_time,end_time,tupdate_start,tupdate_end,t_temp1,t_temp2; 
     double comm_time; 
     double update_time=0; 

     for(i=0;i<N;i++) 
      { 
       procstatus[i]=1; // all the processor are on 
       ack[i]=0; 
      } 

     // read sample data 
     in1.open("samples.dat"); 
     if(!in1) 
     { 
     cout<<"100:File openning error. \n"; 
     exit(100); 
     } 

     in2.open("initialMap.dat"); 
     if(!in2) 
     { 
     cout<<"200:File openning error. \n"; 
     exit(200); 
     } 

     in3.open("gij.dat"); 
     if(!in3) 
     { 
     cout<<"200:File openning error. \n"; 
     exit(200); 
     } 

     for(i=0; i<M; i++) 
      for(j=0; j<n; j++){ 
       in1>>samples[i][j]; 
       //cout<<samples[i][j]<<"="<<i<<","<<j<<" "; 
       } 
     //read initial weights 
     for(i=0; i<N; i++) 
      for(j=0; j<n; j++) { 
      in2>>w[i][j]; 
      //cout<<w[i][j]<<"="<<i<<","<<j<<" "; 
      } 
     //read Gij 
     for(i=0; i<9; i++) 
      for(j=0; j<9; j++) { 
      in3>>g[i][j]; 
      //cout<<w[i][j]<<"="<<i<<","<<j<<" "; 
      } 
     //Print W to file 
      out.open("w.dot"); 
      out<<"graph G {"<<endl; 
      out<<"size=\"10,10\";"<<endl; 
      out<<"ratio=expand;"<<endl; 
      out<<"node [shape=circle];"<<endl; 
      //out<<"node [shape=point];"<<endl; 
      for(i=0; i<9; i++) { 
       for(j=0; j<n; j++) { 
       if(j == 0) out<<i+1<<"[pos = \""; 
       out<<w[i][j]; 
       if(j == 0) out<<","; 
       if(j == 1) out<<"!\"]"<<endl; 
       } 
      } 

      for(i=0; i<9; i++) 
       for(j=0; j<i+1; j++) { 
       if(g[i][j] == 1 && i != j) out<<i+1<<" -- "<<j+1<<";"<<endl; 
       } 
      out<<"}"<<endl; 


     MPI_Barrier(MPI_COMM_WORLD);  
     MPI_Buffer_attach(buf, BUFSIZE); 
     k = 0; 
     numberIterations = 1; 
     dataincount=N; //for the first time , all process or has N data in from file. 
     datareadflag=1; 
     checktestflag=1; 
     int tagno=1; 
     int prevtag; //  start_time=MPI_Wtime(); 
     time_t start,start1,end1,end; 
     time(&start); 
     do{ 
       if(numberIterations%SAMPLE_STEP==0) 
        { 
         t_temp1=MPI_Wtime(); 
         if(k>=M) k=0; 
         for(j=0; j<n; j++) { 
         x[j]=samples[k][j]; 
        } 
         k++; 
        t_temp2=MPI_Wtime(); 
        }   

       if(numberIterations>1) 
       { getdata: 
        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flagprobe, &statusprobe); //tag = numberIterations, 
         while(flagprobe != 0) 
         { 
         if(statusprobe.MPI_TAG==0) // tag=0 means status update of the processor 
          { 
           int rtemp[1]; 
           MPI_Recv(rtemp,1,MPI_INT,statusprobe.MPI_SOURCE,0, MPI_COMM_WORLD, &status);    
           procstatus[status.MPI_SOURCE]=rtemp[0]; 
          } 
          else 
           { 
           datareadflag=1; 
           dataincount++; 
           MPI_Recv(temp,count,MPI_DOUBLE,statusprobe.MPI_SOURCE,statusprobe.MPI_TAG, MPI_COMM_WORLD, &status);    
           for(j=0;j<n;j++) w[status.MPI_SOURCE][j]=temp[j]; 
           } 
         MPI_Iprobe(MPI_ANY_SOURCE,MPI_ANY_TAG, MPI_COMM_WORLD,&flagprobe, &statusprobe); 
         } //end while 
        } //end if (no of iteration >1) 
       if(numberIterations< ITERATION_STEPS) // do not send on last iterations. 
         { 
          tupdate_start=MPI_Wtime(); 
          update(w,x,k,rank,g); 
          tupdate_end=MPI_Wtime(); 
          update_time=update_time+tupdate_end-tupdate_start;     

          if(req==0) { 
          for(i=0;i<N;i++) 
          { int c=0; 
           if((i!=rank)&&(checkstatus(procstatus,i)==1)) // send if only the process is active 
           { 
           MPI_Bsend_init(w[rank], count, MPI_DOUBLE, i ,tagno, MPI_COMM_WORLD,&request[i]); 
           MPI_Bsend_init(&c,1, MPI_INT,i,0, MPI_COMM_WORLD,&request1[i]); 
           } 
          } //end for 
          req=1; 
          } 

          for(i=0;i<N;i++) 
          { 
           if((i!=rank)&&(checkstatus(procstatus,i)==1)) // send if only the process is active 
           { 

            MPI_Start(&request[i]); 
             //actual message send. 
           } 
          } 

          tagno++; 
          requestfreeflag==1; 
          checktestflag=0; 
          dataincount=0; 
          checklocation=0; 
          numberIterations++; 
          datareadflag=0; 

          cout<<numberIterations<<"-th iterations for . "<<rank<<endl; 
         } //end if(numberIterations< ITERATION_STEPS) 

       /* Before exiting notify all the active process */ 
       if((numberIterations == ITERATION_STEPS) && (endloopflag==0)) //endloop flag prevent sending twice 
        { // status value (initially all 1); 
         req1=0; 
         for(i=0;i<N;i++) 
         { 
          if((i!=rank)&&(checkstatus(procstatus,i)==1)) // check if only the process is active 
          { 
           MPI_Start(&request1[i]); 
          } 
         } 
         endloopflag=1; 
         goto getdata; 
         } //end if 

       if(numberIterations == ITERATION_STEPS && endloopflag==1) 
       { 
        for(i=1;i<N;i++) 
         { 
         if((i!=rank)&&(checkstatus(procstatus,i)==1)) // check if only the process is active 
         { 
          MPI_Test(&request[i], &flagtest, &status); 
          MPI_Test(&request1[i], &flagtest1, &status); 
          if(!flagtest || !flagtest1) 
          { 
           checklocation1=i; //for next check continue from i; 
           cout<<"getdata called by" <<rank<<endl; 
           goto getdata; 
          }    
          } //end if 
         }//end for 
        } //end if 

     } while(numberIterations < ITERATION_STEPS); 

     for(i=0;i<N;i++) 
      { 
      if(i!=rank && request[i]!=MPI_REQUEST_NULL) 
      { 
       MPI_Request_free(&request[i]); 
       MPI_Request_free(&request1[i]); 
      } 
      } 

     if(numberIterations == ITERATION_STEPS) 
      { 
      char pno[2]; 
      sprintf(pno,"%d",rank); 
      string filename; 
      filename=filename+pno; 
      filename=filename+".dot"; 
      char *file=strdup(filename.c_str()); 
      ofstream out; 
      out.open(file); 
       //plot(w, "final_map_25.dat",rank); 
      out<<"graph G {"<<endl; 
      out<<"size=\"10,10\";"<<endl; 
      out<<"ratio=expand;"<<endl; 
      out<<"node [shape=point];"<<endl; 
      //out<<"node [shape=point];"<<endl; 
      for(i=0; i<9; i++) { 
       for(j=0; j<n; j++) { 
       if(j == 0) out<<i+1<<"[pos = \""; 
       out<<w[i][j]; 
       if(j == 0) out<<","; 
       if(j == 1) out<<"!\"]"<<endl; 
       } 
      } 

      for(i=0; i<9; i++) 
       for(j=0; j<i+1; j++) { 
       if(g[i][j] == 1 && i != j) out<<i+1<<" -- "<<j+1<<";"<<endl; 
       } 
      out<<"}"<<endl; 
      } 

     MPI_Buffer_detach(&bptr, &bl); 
     MPI_Finalize(); 
return                                         0; 
} // End main Program 

答えて

1

に従って擬似コードを更新しますMPI_Wait(またはその変異体)であるか?待たずに、あなたのバッファがオーバーフローしているかどうか疑問に思っています。これは、あなたのプログラムが停止する前にしばらく実行されていることの説明です。エラーメッセージが表示されないので、ステートメントは自由に解釈できます。デッドロックがあり、プログラムフリーズは、バッファスペースがなくなったためにプログラムがフリーズする状況をカバーします。それがまさにデッドロックかどうかは疑問です。

+0

私は最後にMPI_Request_freeを使用して、プロセッサとバッファを解放します。私はバッファが問題を引き起こしていたと思っていましたが、最後のエラーメッセージはデッドロックのように見えます。 "mpirunはノードnode1082のPID 14656でプロセスランク1がシグナル9(Killed)を終了しました。 – Naga

+1

あなたの擬似コードは疑似正(おそらく)です。 –

+0

まだ同じ問題があります。プログラムはMPI_Test(&request [i]、&flagtest、&status)でループし続けます。 MPI_Test(&request1 [i]、&flagtest1、&status); goto getdata; – Naga

関連する問題