2017-12-20 2 views
0

私は何をしたいですか:私は単純なメッセージストリームが必要なので、いくつかのスクリプトはそこに結果を送信し、別のスクリプトは結果を取り、いくつかは非同期で動作します。Pythonスクリプトの共有メッセージストリームを作成する最も良い方法は何ですか?

主な問題:私は何が起こっているのか見たいと思っています。何かが壊れている場合は、すぐに修正できます。私はCelery + RabbitMQを使用しようとしました.Argsを持つワーカーを見ることができますが、Flowerを使用していますが、複雑すぎるスケジューリングとマルチプロセッシング.Queue(単純ですが、argsを持つワーカーを見ることはできません)。

私がやった何

は私が反応するように、似たような、MongoDBのキャッピングされたコレクションを使用して構築し、複数のプロセスでpopenのを実行しようとしました。いくつかのスクリプトはコレクションにsmthを書き込み、以下のスクリプトはそれを監視し、ある条件が満たされている場合は別のスクリプトを実行します。

主な問題:multiprocessing.Process(内側から subprocess.Popen()の使用)は、(まだ作業を行う)不自然に見えるので、私はより良いおよび/またはより安定した解決策を見つけようとしている:)


リスナースクリプト:

from pymongo import MongoClient, CursorType 
from time import sleep 
from datetime import datetime 

from multiprocessing import Process 
import subprocess 

def worker_email(keyword): 
    subprocess.Popen(["python", "worker_email.py", str(keyword)]) 

def worker_checker(keyword): 
    subprocess.Popen(["python", "worker_checker.py", str(keyword)]) 

if __name__ == '__main__': 

    #DB connect 
    client = MongoClient('mongodb://localhost:27017/') 
    db = client.admetric 
    coll = db.my_collection 
    cursor = coll.find(cursor_type = CursorType.TAILABLE_AWAIT) 

    #Script start UTC time 
    utc_run = datetime.utcnow() 

    while cursor.alive: 
     try: 
      doc = cursor.next() 
      #Print doc name/args to see in command line, while Listener runs 
      print(doc) 
      #Filter docs without 'created' data 
      if 'created' in doc.keys(): 
       #Ignore docs older than script 
       if doc['created'] > utc_run: 
        #Filter docs without 'type' data 
        if 'type' in doc.keys(): 
         #Check type 
         if doc['type'] == 'send_email': 
          #Create process and run external script 
          p = Process(target=worker_email, args=(doc['message'],)) 
          p.start() 
          p.join() 
         #Check type 
         elif doc['type'] == 'check_data': 
          #Create process and run external script 
          p = Process(target=worker_checker, args=(doc['message'],)) 
          p.start() 
          p.join() 
     except StopIteration: 
      sleep(1) 
+0

なぜ 'subprocess'を使ってPythonスクリプトを実行していますか? 'multiprocessing.Process'でPythonコードを実行するだけです。 – noxdafox

+0

@noxdafox私は外部ファイルごとに500〜600行のコードがあるので、サブプロセスはimport *よりもきれいに見えます。私はメインスクリプトの名前空間を切り捨てるのを怖がっている。 – sortas

答えて

1

限り、あなたはworker_emailworker_checkerロジックを管理しているとして、あなたは別のインタプリタで実行する必要はありません。

2つのモジュールのエントリポイントを公開し、multiprocessing.Processで実行するだけです。

worker_email.py

def email_job(message): 
    # start processing the message here 

worker_checker.py

def check_job(message): 
    # start checking the message here 

listener_script.py

# you are not going to pollute the listener namespace 
# as the only names you import are the entry points of the scripts 
# therefore, encapsulation is preserved 
from worker_email import email_job 
from worker_checker import check_job 

email_process = Process(target=email_job, args=[message]) 
check_process = Process(target=check_job, args=[message]) 

あなたは労働者のモジュールからのエントリポイントを公開することができない場合は、単にsubprocess.Popenを実行します。あなたはProcessの中にそれらをラップすることに何の利益もありません。

+0

1.私はインポートを使用しようとしましたが、従業員のファイルにはそれぞれ500〜600行のコードが含まれていましたので、Popen()を使用することをインポートよりもきれいにしました* :) --- 2.プロセス内でそれらをラップすると、複数のコアを使用できます.5〜10秒ごとに50〜100人の作業者を実行する必要があります。 – sortas

+0

1.いいえ、クリーナーではありません。あなたが必要とする機能だけをインポートすることは、全く別のPythonインタプリタを起動するよりはるかに優れています。コード例のコメントを確認してください。 2. 'subprocess.Popen'は既に新しいプロセスでロジックを実行します。プロセス内でそれをラップすると、2つのプロセスが終了し、どちらか一方が他方を待っています。どんな恩恵もありません:) – noxdafox

+0

オクラホマ、そうです:) – sortas

関連する問題