2012-01-07 13 views
0

私は一連のプロセスを持っています。それらをA、B、Cと呼びましょう。これらは互いに通信する必要があります。 AはBとCと通信する必要があります。 BはAとCと通信する必要があります。 CはAとBと通信する必要があります。A、B、Cは異なるマシンまたは同じマシンに配置できます。SocketServerインスタンスとのPythonマルチプロセッシング通信

私は、ソケットを介して通信し、すべてが同じマシンにある場合は "localhost"を使用することを考えました(たとえば、ポート11111のA、ポート22222のBなど)。このようにして、非ローカルプロセスはローカルプロセスのように扱われます。そのためには、A、B、CのそれぞれについてSocketServerインスタンスを設定し、それぞれが他の2つのアドレスを知っていると思った。 AからBのように通信を行う必要があるときはいつでも、AはBへのソケットを開いてデータを書き込む。その後、Bが常時稼働しているサーバーは、データを読み取り、必要に応じて後で使用できるようにリストに格納します。

私が遭遇している問題は、保存された情報がfinish_requestメソッド(リスンを処理しているメソッド)と__call__メソッド(これは処理中です)の間で共有されていないということです。 (サーバークラスは他のものに必要なので呼び出すことができますが、それは問題に関連しているとは思われません)

私の想像どおりの質問です。 multiprocessingthreading、およびsocketserverはすべて同じマシン上でうまく再生されますか? (QueueまたはPipeのような)プロセス間の通信に他のメカニズムを使用することには興味がありません。私はそれらの作業の解決策を持っています。私はこのアプローチが可能であるかどうかを、たとえそれほど効率が悪くても知りたいです。そして、もしそうなら、私は何が間違って働いていないのですか?

問題を示して最小限の例では、以下の通りです:

import uuid 
import sys 
import socket 
import time 
import threading 
import collections 
import SocketServer 
import multiprocessing 

class NetworkMigrator(SocketServer.ThreadingMixIn, SocketServer.TCPServer): 
    def __init__(self, server_address, client_addresses, max_migrants=1): 
     SocketServer.TCPServer.__init__(self, server_address, None) 
     self.client_addresses = client_addresses 
     self.migrants = collections.deque(maxlen=max_migrants) 
     self.allow_reuse_address = True 
     t = threading.Thread(target=self.serve_forever) 
     t.daemon = True 
     t.start() 

    def finish_request(self, request, client_address): 
     try: 
      rbufsize = -1 
      wbufsize = 0 
      rfile = request.makefile('rb', rbufsize) 
      wfile = request.makefile('wb', wbufsize) 

      data = rfile.readline().strip() 
      self.migrants.append(data) 
      print("finish_request:: From: %d To: %d MID: %d Size: %d -- %s" % (client_address[1], 
                        self.server_address[1], 
                        id(self.migrants), 
                        len(self.migrants), 
                        data)) 

      if not wfile.closed: 
       wfile.flush() 
      wfile.close() 
      rfile.close()   
     finally: 
      sys.exc_traceback = None 

    def __call__(self, random, population, args): 
     client_address = random.choice(self.client_addresses) 
     migrant_index = random.randint(0, len(population) - 1) 
     data = population[migrant_index] 
     data = uuid.uuid4().hex 
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     try: 
      sock.connect(client_address) 
      sock.send(data + '\n') 
     finally: 
      sock.close() 
     print("  __call__:: From: %d To: %d MID: %d Size: %d -- %s" % (self.server_address[1], 
                       client_address[1], 
                       id(self.migrants), 
                       len(self.migrants), 
                       data)) 
     if len(self.migrants) > 0: 
      migrant = self.migrants.popleft() 
      population[migrant_index] = migrant 
     return population 


def run_it(migrator, rand, pop): 
    for i in range(10): 
     pop = migrator(r, pop, {}) 
     print("  run_it:: Port: %d MID: %d Size: %d" % (migrator.server_address[1], 
                   id(migrator.migrants), 
                   len(migrator.migrants))) 
     time.sleep(1) 


if __name__ == '__main__': 
    import random 
    r = random.Random() 
    a = ('localhost', 11111) 
    b = ('localhost', 22222) 
    c = ('localhost', 33333) 
    am = NetworkMigrator(a, [b, c], max_migrants=11) 
    bm = NetworkMigrator(b, [a, c], max_migrants=22) 
    cm = NetworkMigrator(c, [a, b], max_migrants=33) 

    fun = [am, bm, cm] 
    pop = [["larry", "moe", "curly"], ["red", "green", "blue"], ["small", "medium", "large"]] 
    jobs = [] 
    for f, p in zip(fun, pop): 
     pro = multiprocessing.Process(target=run_it, args=(f, r, p)) 
     jobs.append(pro) 
     pro.start() 
    for j in jobs: 
     j.join() 
    am.shutdown() 
    bm.shutdown() 
    cm.shutdown() 

は、この例からの出力を見ると、印刷の3種類があるでしょう:「MID」

 run_it:: Port: 11111 MID: 3071227860 Size: 0 
     __call__:: From: 11111 To: 22222 MID: 3071227860 Size: 0 -- e00e0891e0714f99b86e9ad743731a00 
finish_request:: From: 60782 To: 22222 MID: 3071227972 Size: 10 -- e00e0891e0714f99b86e9ad743731a00 

idですその場合にはmigrantsデュークです。 「From」および「To」は、送信を送受信するポートである。そして、データをランダムな16進文字列に設定して、個々の送信を追跡できるようにしています。

なぜ同じMIDであっても、ある時点でサイズが0ではないと言われますが、後でそのサイズが0であると言われます。コールがマルチスレッド化されていることからこれらの行ではなく、最終的な2つのforループを使用している場合、システムは、私が期待するように動作します:

for _ in range(10): 
    for f, p in zip(fun, pop): 
     f(r, p, {}) 
     time.sleep(1) 

だから何がそれを壊すマルチプロセッシング・バージョンで起きているのでしょうか?

答えて

1

3つの新しいNetworkMigratorオブジェクトを作成すると、3つの新しいスレッドが開始され、それぞれ新しいTCP接続を待機します。その後、run_it関数の3つの新しいプロセスを開始します。合計で4つのプロセスがあり、最初のプロセスには4つのスレッド(1つのメイン+ 3サーバー)が含まれています。ここで問題となるのは、他の3つのプロセスが、リスニングサーバースレッドによってオブジェクトに加えられた変更にアクセスできないことです。これは、プロセスがデフォルトでメモリを共有しないためです。

あなたは3つの新しいスレッドの代わりに、プロセスを開始するのであれば、あなたは違いがわかります。

pro = threading.Thread(target=run_it,args=(f,r,p)) 

別のマイナーな問題があります。スレッド間のこの共有も完全に安全ではありません。オブジェクトの状態を変更するたびにロックを使用するのが最良です。 finish_requestとの両方で以下のようにするのが最善です。メソッドを呼び出します。あなたはマルチスレッドに不満があるとあなたがマルチプロセッシングをしたいならば、ここで説明したように

lock = Lock() 
... 
lock.acquire()  
self.migrants.append(data) 
lock.release() 

、その後、プロキシオブジェクトを使用することができます。 http://docs.python.org/library/multiprocessing.html#proxy-objects

をオブジェクトIDは同一であることについては、それは予想外ではありません。新しいプロセスは、その時点でオブジェクトの状態(オブジェクトIDを含む)に渡されます。新しいプロセスはこれらのオブジェクトIDを保持するために行われますが、ここでは異なるプロセスであるため完全に異なる2つのメモリ空間について説明しています。したがって、メインプロセスによって行われた変更は、作成されたサブプロセスに反映されません。

+0

優れた睡眠。非常に明確な。本当にありがとう。 – agarrett

関連する問題