2017-01-27 27 views
0

私は、PythonでUDPを使用して多くのパケット損失が発生しています。私は、パケットロスを望まない場合はTCPを使うべきだと知っていますが、送信者に対して(完全な)制御権がありません。Pythonでパケット損失、パケット損失を受け取ります。

UDPマルチキャストを使用して1秒あたり15枚の画像を送信するカメラです。

以下は、今書いたコードです。 マルチプロセッシングを使用して、プロデューサとコンシューマの機能を並行して動作させます。プロデューサ関数はパケットを捕捉し、コンシューマ関数はそれを処理し、画像を.bmpファイルに書き込みます。

パッケージのバイトを.bmpファイルに書き込むクラスPacketStreamを作成しました。

カメラが新しい画像を送信すると、最初に1バイト= 0x01のパケットが送信されます。これには画像に関する情報が含まれています。 その後、最初のバイト= 0x02で612パケットが送信されます。これらは、画像からのバイト数(508バイト/パケット)を含みます。

1秒あたり15枚の画像が送信されるため、1秒あたり〜9000パケットが送信されます。これは、イメージ当たりバースト数が約22パケット/秒の高速な速度で発生します。

すべてのパケットは、tcpdumpまたはwiresharkを使用して完全に受信できます。 しかし、以下のコードを使用すると、パケットが欠落します。 確かに私の窓7 PCはこれを処理できるはずですか?私はラズベリー・パイ3でもそれを使用していますが、多かれ少なかれ同じ数のパケットが欠落しています。だから私はそれがコードの問題だと思う。

私は、マルチ処理の代わりにスレッド、キューの代わりにパイプを使用するなど、さまざまなことを試しました。

Iはまた、無駄に

sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000) 

とソケットバッファを増加しようとしました。

これはすべてPythonで可能ですか?事前に

おかげで、

import time 
from multiprocessing import Process, Queue 
import socket 
import struct 
from PIL import Image 


class PacketStream: 
    def __init__(self, output_path): 
     self.output_path = output_path 
     self.data_buffer = '' 
     self.img_id = -1 # -1 = waiting for start of new image 

    def process(self, data): 
     message_id = data[0] 
     if message_id == '\x01': 
      self.wrap_up_last_image() 
      self.img_id = ord(data[3]) 
      self.data_buffer = '' 
     if message_id == '\x02': 
      self.data_buffer += data[6:] 

    def wrap_up_last_image(self): 
     if self.img_id > 0: 
      n_bytes = len(self.data_buffer) 
      if n_bytes == 307200: 
       global i 
       write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp', 
          self.data_buffer) 
       i += 1 
      else: 
       print 'Image lost: %s bytes missing.' % (307200 - n_bytes) 


def write_image(path, data): 
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1) 
    im.save(path) 
    print time.time(), path 


def producer(q): 
    # setup socket 
    MCAST_GRP = '239.255.83.71' 
    MCAST_PORT = 2271 
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
    sock.bind(('', MCAST_PORT)) 
    mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) 
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 
    while True: 
     q.put(sock.recv(512)) 


def consumer(q): 
    packet_stream = PacketStream('D:/bmpdump/') 
    while True: 
     data = q.get() 
     packet_stream.process(data) 

i = 0 
if __name__ == '__main__': 
    q = Queue() 

    t1 = Process(target=producer, args=(q,)) 
    t1.daemon = True # so they stop when the main prog stops 
    t1.start() 
    t2 = Process(target=consumer, args=(q,)) 
    t2.daemon = True 
    t2.start() 

    time.sleep(10.0) 

    print 'Program finished.' 

EDITすべての提案のための

感謝。

1)私はスレッド+キューをすでに試してみましたが、 '' .join()も大きな違いはありませんでした。問題は、プロデューサスレッドが十分な優先順位を得られないということです。私はこれをPythonを使って増やす方法を見つけることができませんか?これも可能ですか?

2)私は以下のコードを使用して約10%しか失われませんでした。最後のデータパッケージが

import time 
import socket 
import struct 
from PIL import Image 


def write_image(path, data): 
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1) 
    im.save(path) 
    print time.time(), path 

def consume(data_buffer): 
    img_id = ord(data_buffer[0][1]) 
    real_data_buffer = [data[6:] for data in data_buffer] 
    data_string = ''.join(real_data_buffer) 

    global i 
    write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string) 
    i += 1 

def producer(sock): 
    print 'Producer start' 
    data_buffer = [] 
    while True: 
     data = sock.recvfrom(512)[0] 
     if data[0] == '\x01': 
      data_buffer = [] 
     else: 
      data_buffer.append(data) 
     if len(data_buffer) == 612: 
      consume(data_buffer) 


# image counter 
i = 0 

# setup socket 
MCAST_GRP = '239.255.83.71' 
MCAST_PORT = 2271 
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
sock.bind((MCAST_GRP, MCAST_PORT)) 
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) 
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000) 

producer(sock) 
+0

私はUDPソケットを扱うときに '' sock.recv''ではなく '' sock.recvfrom''を使わなければならないと思います。多分それが助けますか? –

+0

UDPデータ読み取りスレッド/プロセス(Linuxではsched_setscheduler()など)を使用するか、使用している他のどのOSでも同様のAPIを使用する必要があります。それに対応する)。そうすれば、あなたのリーダースレッドが別のタスクによってCPUから離れてしまう可能性が少なくなり、完全な受信バッファとパケットの廃棄につながる可能性があります。 –

答えて

0

あなたのコードを改善するためのいくつかの提案が到着したとき、プロセッサはつまり、鍵がパケットストリームでの一時停止があるとき、データを消費することである(ラズベリーパイで)〜25%ですしかし、最初に質問:物事を減速させているものをすべて測定しましたか?たとえば、システムのCPU使用率を調べたことがあります。あなたが100%を打っているなら、それは非常にうまくパケット損失の理由かもしれません。それが主にアイドル状態の場合、何か他のことが起こり、問題はコードのパフォーマンスに関係しません。今

、コードを改善するためのいくつかの提案:UDPソケット

  • に対処することは、プロセスとマルチプロセッシング送信するために発生することがありシリアライズを使用していない

    • 使用socket.recvfromの代わりsock.recvあるプロセスから他のプロセスへのデータは、〜9000コール/秒で話しているとパフォーマンスのボトルネックになる可能性があります。代わりにスレッドを使用してください(threading + queueモジュール)。しかし、あなたが観測された数字を提供していないので、本当に言うことは難しいです。
    • 受信者のバッファがパケットを取得する際に文字列連結を使用しないでください。これにより、多数の新しい一時文字列オブジェクトが作成され、常にデータがコピーされます。代わりに、各パケットをリストに追加し、すべてのデータを受け取ったときには、"".join(packets)を一度にまとめてください。
  • 関連する問題