私は一連のプロセスを持っています。それらをA、B、Cと呼びましょう。これらは互いに通信する必要があります。 AはBとCと通信する必要があります。 BはAとCと通信する必要があります。 CはAとBと通信する必要があります。A、B、Cは異なるマシンまたは同じマシンに配置できます。SocketServerインスタンスとのPythonマルチプロセッシング通信
私は、ソケットを介して通信し、すべてが同じマシンにある場合は "localhost"を使用することを考えました(たとえば、ポート11111のA、ポート22222のBなど)。このようにして、非ローカルプロセスはローカルプロセスのように扱われます。そのためには、A、B、CのそれぞれについてSocketServerインスタンスを設定し、それぞれが他の2つのアドレスを知っていると思った。 AからBのように通信を行う必要があるときはいつでも、AはBへのソケットを開いてデータを書き込む。その後、Bが常時稼働しているサーバーは、データを読み取り、必要に応じて後で使用できるようにリストに格納します。
私が遭遇している問題は、保存された情報がfinish_request
メソッド(リスンを処理しているメソッド)と__call__
メソッド(これは処理中です)の間で共有されていないということです。 (サーバークラスは他のものに必要なので呼び出すことができますが、それは問題に関連しているとは思われません)
私の想像どおりの質問です。 multiprocessing
、threading
、およびsocketserver
はすべて同じマシン上でうまく再生されますか? (Queue
またはPipe
のような)プロセス間の通信に他のメカニズムを使用することには興味がありません。私はそれらの作業の解決策を持っています。私はこのアプローチが可能であるかどうかを、たとえそれほど効率が悪くても知りたいです。そして、もしそうなら、私は何が間違って働いていないのですか?
問題を示して最小限の例では、以下の通りです:
import uuid
import sys
import socket
import time
import threading
import collections
import SocketServer
import multiprocessing
class NetworkMigrator(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
def __init__(self, server_address, client_addresses, max_migrants=1):
SocketServer.TCPServer.__init__(self, server_address, None)
self.client_addresses = client_addresses
self.migrants = collections.deque(maxlen=max_migrants)
self.allow_reuse_address = True
t = threading.Thread(target=self.serve_forever)
t.daemon = True
t.start()
def finish_request(self, request, client_address):
try:
rbufsize = -1
wbufsize = 0
rfile = request.makefile('rb', rbufsize)
wfile = request.makefile('wb', wbufsize)
data = rfile.readline().strip()
self.migrants.append(data)
print("finish_request:: From: %d To: %d MID: %d Size: %d -- %s" % (client_address[1],
self.server_address[1],
id(self.migrants),
len(self.migrants),
data))
if not wfile.closed:
wfile.flush()
wfile.close()
rfile.close()
finally:
sys.exc_traceback = None
def __call__(self, random, population, args):
client_address = random.choice(self.client_addresses)
migrant_index = random.randint(0, len(population) - 1)
data = population[migrant_index]
data = uuid.uuid4().hex
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(client_address)
sock.send(data + '\n')
finally:
sock.close()
print(" __call__:: From: %d To: %d MID: %d Size: %d -- %s" % (self.server_address[1],
client_address[1],
id(self.migrants),
len(self.migrants),
data))
if len(self.migrants) > 0:
migrant = self.migrants.popleft()
population[migrant_index] = migrant
return population
def run_it(migrator, rand, pop):
for i in range(10):
pop = migrator(r, pop, {})
print(" run_it:: Port: %d MID: %d Size: %d" % (migrator.server_address[1],
id(migrator.migrants),
len(migrator.migrants)))
time.sleep(1)
if __name__ == '__main__':
import random
r = random.Random()
a = ('localhost', 11111)
b = ('localhost', 22222)
c = ('localhost', 33333)
am = NetworkMigrator(a, [b, c], max_migrants=11)
bm = NetworkMigrator(b, [a, c], max_migrants=22)
cm = NetworkMigrator(c, [a, b], max_migrants=33)
fun = [am, bm, cm]
pop = [["larry", "moe", "curly"], ["red", "green", "blue"], ["small", "medium", "large"]]
jobs = []
for f, p in zip(fun, pop):
pro = multiprocessing.Process(target=run_it, args=(f, r, p))
jobs.append(pro)
pro.start()
for j in jobs:
j.join()
am.shutdown()
bm.shutdown()
cm.shutdown()
は、この例からの出力を見ると、印刷の3種類があるでしょう:「MID」
run_it:: Port: 11111 MID: 3071227860 Size: 0
__call__:: From: 11111 To: 22222 MID: 3071227860 Size: 0 -- e00e0891e0714f99b86e9ad743731a00
finish_request:: From: 60782 To: 22222 MID: 3071227972 Size: 10 -- e00e0891e0714f99b86e9ad743731a00
id
ですその場合にはmigrants
デュークです。 「From」および「To」は、送信を送受信するポートである。そして、データをランダムな16進文字列に設定して、個々の送信を追跡できるようにしています。
なぜ同じMIDであっても、ある時点でサイズが0ではないと言われますが、後でそのサイズが0であると言われます。コールがマルチスレッド化されていることからこれらの行ではなく、最終的な2つのfor
ループを使用している場合、システムは、私が期待するように動作します:
for _ in range(10):
for f, p in zip(fun, pop):
f(r, p, {})
time.sleep(1)
だから何がそれを壊すマルチプロセッシング・バージョンで起きているのでしょうか?
優れた睡眠。非常に明確な。本当にありがとう。 – agarrett