私は何をしたいですか:私は単純なメッセージストリームが必要なので、いくつかのスクリプトはそこに結果を送信し、別のスクリプトは結果を取り、いくつかは非同期で動作します。Pythonスクリプトの共有メッセージストリームを作成する最も良い方法は何ですか?
主な問題:私は何が起こっているのか見たいと思っています。何かが壊れている場合は、すぐに修正できます。私はCelery + RabbitMQを使用しようとしました.Argsを持つワーカーを見ることができますが、Flowerを使用していますが、複雑すぎるスケジューリングとマルチプロセッシング.Queue(単純ですが、argsを持つワーカーを見ることはできません)。
私がやった何:は私が反応するように、似たような、MongoDBのキャッピングされたコレクションを使用して構築し、複数のプロセスでpopenのを実行しようとしました。いくつかのスクリプトはコレクションにsmthを書き込み、以下のスクリプトはそれを監視し、ある条件が満たされている場合は別のスクリプトを実行します。
主な問題:multiprocessing.Process(内側から subprocess.Popen()の使用)は、(まだ作業を行う)不自然に見えるので、私はより良いおよび/またはより安定した解決策を見つけようとしている:)
リスナースクリプト:
from pymongo import MongoClient, CursorType
from time import sleep
from datetime import datetime
from multiprocessing import Process
import subprocess
def worker_email(keyword):
subprocess.Popen(["python", "worker_email.py", str(keyword)])
def worker_checker(keyword):
subprocess.Popen(["python", "worker_checker.py", str(keyword)])
if __name__ == '__main__':
#DB connect
client = MongoClient('mongodb://localhost:27017/')
db = client.admetric
coll = db.my_collection
cursor = coll.find(cursor_type = CursorType.TAILABLE_AWAIT)
#Script start UTC time
utc_run = datetime.utcnow()
while cursor.alive:
try:
doc = cursor.next()
#Print doc name/args to see in command line, while Listener runs
print(doc)
#Filter docs without 'created' data
if 'created' in doc.keys():
#Ignore docs older than script
if doc['created'] > utc_run:
#Filter docs without 'type' data
if 'type' in doc.keys():
#Check type
if doc['type'] == 'send_email':
#Create process and run external script
p = Process(target=worker_email, args=(doc['message'],))
p.start()
p.join()
#Check type
elif doc['type'] == 'check_data':
#Create process and run external script
p = Process(target=worker_checker, args=(doc['message'],))
p.start()
p.join()
except StopIteration:
sleep(1)
なぜ 'subprocess'を使ってPythonスクリプトを実行していますか? 'multiprocessing.Process'でPythonコードを実行するだけです。 – noxdafox
@noxdafox私は外部ファイルごとに500〜600行のコードがあるので、サブプロセスはimport *よりもきれいに見えます。私はメインスクリプトの名前空間を切り捨てるのを怖がっている。 – sortas