2017-12-14 25 views
0

質問フラスケからセロリーワーカーに割り込みを送るには?

私はこのtutorialのように、フラスコのアプリケーションからのセロリの労働者からのステータスにアクセスするにいくつかを読んだが、あなたは他の道を行くことができますか?それが開始された後、割り込みを送信するか、セロリの労働者にイントロスペクションを取得しますか?

私は少しだけsignalsを読んだことがありますが、まだそれらを理解していないか、または私が探しているものではありません。おそらく両方。

背景

私はMQTTトピックにサブスクライブ長時間実行ループをキックオフするためにセロリを使用しています、私はまた、別のエンドポイントから、そのプロセス/サブスクリプションをシャットダウンできるようにしたいのですが私のフラスコアプリ。これを行う最善の方法は何ですか?または方法?

サンプルコード

from flask import Flask 
from celery import Celery 
import time 

app = Flask(__name__) 
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' 
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' 

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) 
celery.conf.update(app.config) 

@celery.task(bind=True) 
def test_loop(self): 
    i=0 
    running = True 
    while running: 
     i = i+1 
     print "loop running %d" % i 
     time.sleep(1) 

@app.route('/') 
def index(): 
    return 'index page' 

@app.route('/start') 
def start(): 
    global task 
    task = test_loop.delay() 
    return "started loop" 

@app.route('/stop') 
def stop(): 
    global task    ### What I'm having trouble with 
    task.running = False ### How can I interrupt/introspect into the task? 
    return "stopped loop" 

TL/

DRは、それが開始されていた後、セロリワーカーにイントロスペクションを割り込みを送信したり、取得する方法はありますか?どのように私はフラスコからセロリワーカーで開始された長期実行ループを停止できますか?

答えて

1

私の個人的な考えは、永遠に実行されるタスクから遠ざかることです。

タスクを強制終了する必要がある場合は、取り消しを使用できます。 http://docs.celeryproject.org/en/latest/userguide/workers.html#revoke-revoking-tasks

@app.route('/stop') 
def stop(): 
    global task 
    task.revoke(terminate=True, signal='SIGKILL') 
    return "stopped loop" 

セロリは、あなたのユースケースのためにやり過ぎかもしれないが、私はあなたの最終目標は、私は本当にすべての選択肢を提供することはできませんが何であるか全くわかりません。

+0

素晴らしいです。私はまた、セロリが最初は残酷であると思っていましたが、マルチプロセッシングを動作させることができませんでした。 私の最終目標は、BBQを喫煙しながら無線バーベキュー温度計からの温度の読み取り値を収集、記録、プロット、および分析することです。通常、セッションは数時間から12時間以上です。受信したデータをMQTTメッセージとして出力できる[rtl433](https://github.com/merbanan/rtl_433)からの読書を受けています。今はラズベリーパイで本質的に常に稼働しているので、RTLSDRハードウェアの依存関係をWebサーバーから切り離すことができます。 – washer

+0

Webサーバーでは、MQTTトピックを購読し、JSONファイルに新しいメッセージを書き込みます。これは、上記のコードで 'test_loop'によって長らく実行されている作業です。 私は、ログのパスとファイル名のようなFlaskからいくつかの設定を渡すことができるようにしたいと思います。 MQTTクライアント(私は[paho-mqtt](https://pypi.python.org/pypi/paho-mqtt/1.1)を使って接続をセットアップし、 'while True:time.sleep(1) ) 'メッセージを待っていると、長時間実行されるタスクの代替手段がわからない。 もっと良い方法を考えてもいいですか? – washer

関連する問題