0

私は分散プログラムを使いました。ネットワーク内のすべてのノード(仮想マシン)は、他のすべてのノードとの間でデータを(送信接続を介して)送信し、(受信接続を介して)データを受信します。データを送信する前に、すべてのノードが他のすべてのノード(単一のソースノードを含む)へのソケットを持っています。 3秒の遅延の後、送信元はネットワーク内の他のノードのそれぞれに異なるファイルチャンクを送信し始めます。すべてのノードは、最初のパケットが到着した後に受信チャンクを転送し始める。ピアによる接続リセット[Errno 104] in Python

プログラムはエラーなしで何度も正​​常に終了します。しかし、時には1つのランダムなノードが(データを送信しているコネクションを通じてデータを送信しながら)コネクションをリセットすることがあります。

各ノードには、n-2送信側スレッドとn-1受信側スレッドがあります。

送信機能:

def relaySegment_Parallel(self): 
     connectionInfoList = [] 
     seenSegments = [] 
     readyServers = [] 
     BUFFER_SIZE = Node.bufferSize 
     while len(readyServers) < self.connectingPeersNum-len(Node.sources) and self.isMainThreadActive(): #Data won't be relayed to the sources 
      try: 
       tempIp = None 
       for ip in Node.IPAddresses: 
        if ip not in readyServers and ip != self.ip and ip not in self.getSourcesIp(): 
         tempIp = ip 
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
         s.connect((ip, Node.dataPort)) 
         connectionInfoList.append((s, ip)) 
         readyServers.append(ip) 
         if Node.debugLevel2Enable: 
          print "RelayHandler: Outgoing connection established with IP: " + str(ip) 
      except socket.error, v: 
       errorcode = v[0] 
       if errorcode == errno.ECONNRESET: 
        print "(RelayHandler) Connection reset ! Node's IP: " + str(tempIp) 
       if errorcode == errno.ECONNREFUSED: 
        print "(RelayHandler) Node " + str(tempIp) + " are not ready yet!" 
       continue 
      except: 
       print "Error: Cannot connect to IP: " + str (tempIp) 
       continue 
      print "(RelayHandler) Ready to relay data to " + str(len(readyServers)) + " numeber of servers." 
     try: 
      pool = ThreadPool(processes = Node.threadPoolSize) 
      while Node.terminateFlag == 0 and not self.isDistributionDone() and self.isMainThreadActive(): 
       if len(self.toSendTupleList) > 0: 
        self.toSendLock.acquire() 
        segmentNo, segmentSize, segmentStartingOffset, data = self.toSendTupleList.pop(0) 
        self.toSendLock.release() 
        if len(data) > 0: 
         if segmentNo not in seenSegments: 
          #Type: 0 = From Sourece , 1 = From Rlayer 
          #Sender Type/Segment No./Segment Size/Segment Starting Offset/ 
          tempList = [] 
          for s, ip in connectionInfoList: 
           tempData = "1/" + str(self.fileSize) + "/" + str(segmentNo) + "/" + str(segmentSize) + "/" + str(segmentStartingOffset) + "/" 
           tempList.append((s, ip, tempData)) 
          pool.map(self.relayWorker, tempList) 
          seenSegments.append(segmentNo) 
         relayList = [] 
         for s, ip in connectionInfoList: 
          relayList.append((s, ip, data)) 
         pool.map(self.relayWorker, relayList) 
      for s, ip in connectionInfoList: 
       s.shutdown(1)# 0:Further receives are disallowed -- 1: Further sends are disallow/sends -- 2: Further sends and receives are disallowed. 
       s.close() 
      pool.close() 
      pool.join() 
     except socket.error, v: 
      errorcode=v[0] 
      if errorcode==errno.ECONNREFUSED: 
       print "(RelayHandler) Error: Connection Refused in RelaySegment function. It can not connect to: ", ip 
      else: 
       print "\n(RelayHandler) Error1 in relaying segments (Parallel) to ", ip, " !!! ErrorCode: ", errorcode 
      traceback.print_exception(*sys.exc_info()) 
     except: 
      print "\n(RelayHandler) Error2 in relaying segments (Parallel) to ", ip 
      traceback.print_exception(*sys.exc_info()) 

受信機能:

def receiveDataHandler(self): 
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     try: 
      s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# Allows us to resue the port immediately after termination of the program 
      s.bind((self.ip, Node.dataPort)) 
      s.listen(Node.MaxNumClientListenedTo) 
      threadsList = [] 
      fHandler = fileHandler(self.inFileAddr, Node.bufferSize) 
      isStart = False 
      executionTime = 0 
      connectedPeersSofar = 0 
      while (not self.connectingPeersNum == connectedPeersSofar) and self.isMainThreadActive() and Node.terminateFlag == 0 and not self.isReceptionDone(): 
       conn, ipAddr = s.accept() 
       thread_receiveData = Thread2(target = self.receiveData_Serial, args = (conn, ipAddr, fHandler)) 
       thread_receiveData.start() 
       if Node.debugLevel2Enable: 
        print 'Receive Handler: New thread started for connection from address:', ipAddr 
       connectedPeersSofar += 1 
       threadsList.append(thread_receiveData) 
       if isStart == False: 
        isStart = True 
      print "(RecieiverHandeler) Receiver stops listening: Peers Num "+str(self.connectingPeersNum) +i " connected peers so far: " + str(connectedPeersSofar) 
      for i in range(0, len(threadsList)): 
       self.startTime = threadsList[i].join() 
      if isStart: 
       executionTime = float(time.time()) - float(self.startTime) 
      else: 
       print "\n\t No Start! Execution Time: --- 0 seconds ---" , "\n" 
      s.shutdown(2)# 0:Further receives are disallowed -- 1: Further sends are disallow/sends -- 2: Further sends and receives are disallowed. 
      s.close() 
      return executionTime 
     except socket.error, v: 
      errorcode = v[0] 
      if errorcode == 22: # 22: Invalid arument 
       print "Error: Invalid argument in connection acceptance (receive data handler)" 
      elif errorcode==errno.ECONNREFUSED: 
       print "Error: Connection Refused in receive" 
      else: 
       print "Error1 in Data receive Handler !!! ErrorCode: ", errorcode 
      traceback.print_exception(*sys.exc_info()) 
     except: 
      print "Error2 in Data receive Handler !!!" 
      traceback.print_exception(*sys.exc_info()) 

ノードが(ランダム誤動作ノードを含む)他のすべてのノードに接続されている全てのノードプリントの送信スレッド。しかしながら、ランダムなノードの受信機能は

s.accept()

に待機し、任意の接続が、接続するための最後のものである単一のソースからの接続を受け付けません。ランダムノードは、例外を発生させることなく待機します。

s.accept()

は、いずれかが、最後の1を受け入れていない一方で、ランダムなノードの

s.listen()

(TCPのprotocole)は、送信者は、それらが接続されていることを考えさせるようです。次に、何らかの理由でコネクションをリセットするため、他の人(送信者)がデータを送信しようとすると「Connection reset by peer」例外が送出されるのです。何のエラーもなく仕事を終える唯一の送信者は、最後に接続する送信元です。

エラー:なぜ起こっていることを

Traceback (most recent call last): 
File "/home/ubuntu/DCDataDistribution/Node.py", line 137, in relayWorker 
socketConn.sendall(data) 
File "/usr/lib/python2.7/socket.py", line 224, in meth 
return getattr(self._sock,name)(*args) 
error: [Errno 104] Connection reset by peer 

FYI: Amazon EC2インスタンスでプログラムを実行しています。各インスタンスのタイプはt2.micro(1 vCPU、2.5 GHz、Intel Xeonファミリ(最大3.3 GHz)、1 GiBメモリ)です。 Ubuntu Server 14.04 LTS(HVM)はすべてのインスタンスで実行されています。

+0

ここで診断するコードは不十分で、説明は少し難しいです。推測のように、あなたの "ランダムな"受信ノードが間違ったソケットを閉じていると思われ、 'ECONNRESET'エラーが発生します。たぶん、データ構造のスレッドの同期の問題? –

+0

@GilHamiltonコードの削除部分を追加しました。実際には、「ランダム」な受信ノードは、受信機能の「while」ブロックから決して出てこないので、他の人が接続するのを待っています(他の人は接続していると言います)。だから、ノードは、ソケットを閉じる機会を得るために "while"ブロックから出ることはありません。また、ソケットを閉じるのは "receiveData_Serial"関数ではなく "receiveDataHandler"関数だけです。これ以上の説明が必要な場合は、私は幸せ以上になります。 –

+0

説明にコードをマッチさせるのに苦労しています。たとえば、「ソケットを閉じるのは「receiveDataHandler」だけですが、受け入れられたソケットはまったく閉じられていません。提案:もしノードAがノードBに接続していると言うなら、リモートポート番号がconnect(ノードAのgetpeername)のために何であるか調べる。次に、ノードBで 'netstat -atn'を実行し、そのポートがどの状態にあるかを調べます。 –

答えて

0
  for s, ip in connectionInfoList: 
       s.shutdown(1)# 0:Further receives are disallowed -- 1: Further sends are disallow/sends -- 2: Further sends and receives are disallowed. 
       s.close() 
      pool.close() 
      pool.join() 

あなたshutdown接続poolでいくつかのrelayWorkerスレッドがまだ未完成であってもよいです。順序を逆にする:

   pool.close() 
       pool.join() 
       for s, ip in connectionInfoList: 
        s.close() 
関連する問題