2017-12-16 9 views
5

djangoアプリケーションでは、私は非同期タスクを実行しており、進行状況やエラーなどをユーザーに表示したいと考えています。エラーが発生した場合は、問題を解決するために追加の入力や何らかのアクションが必要なページにリダイレクトする必要があります。セロリの仕事からフロントエンドに伝える最良の方法は何ですか?Djangoのセロリの従業員がリアルタイムのステータスと結果メッセージをフロントエンドに送信する

ここで擬似コードでの基本的な構造です:私は、理想的にしたいと思う何

# views.py 
from tasks import run_task 

def view_task(): 
    run_task.delay() 
    return render(request, 'template.html') 

# tasks.py 
from compute_module import compute_fct 

@shared_task 
def run_task(): 
    result = compute_fct() 

    # how to catch status update messages from compute_module while compute_fct is running?? 

    if result == 'error': 
     handle_error() 
    else: 
     handle_succes()  

# compute_module 
import pandas as pd 

def compute_fct(): 
    # send message: status = loading file 
    df = pd.read_csv('test.csv') 
    # send message: status = computing 
    val = df['col'].mean() 

    if val is None: 
     return {'status':'error'} 
    else: 
     return {'status':'success','val':val} 

  • compute_module.pyモジュールは、Pythonネイティブロガーを使用しています。任務の分離によって、できるだけジェネリックとしてロギングを維持し、標準のpython/djangoロガーを使用したいと思います。しかし彼らはフロントエンドにメッセージを送るようには設計されていないようです。
  • セロリのタスクは、何らかの形でログを処理し、標準出力の代わりにそれらを表示する
  • フロントエンドjsのショーをプッシャするためにそれらをリダイレクトし、セロリの労働者とフロントの間の通信の標準的な方法があるかもしれませんメッセージ

を扱います私が気づいていない終わり。このシナリオは頻繁に起こらなければならず、実装が非常に難しいと私は驚いています。つまり、rabbitmqメッセージキューまたはaws snsはこのために設計されていなければなりません。私が見たリソースはどちらかというとうまく動作しないと感じていませんが、ちょうど混乱しているかもしれません。

ログ:これはユーザー

セロリカムにメッセージを送信しない、サーバ側でのロギングの詳細のようです管理者の監視タスクに関するもので、センではないようですユーザーへの鼎メッセージが

プッシャー私は好きですが、私はそれでcompute_module.pyの契約を持っている必要はありません。つまり、私はcompute_module.pyの中にpusher.comの統合をしたくないと思う。モジュールはちょうど私が私が「唯一の方法

+0

ているのですか?あなたがタスクを実行する、それは完了したか、エラーです。サブタスクとして分解されたタスクを実行した場合、Webワーカーを使用して各サブの最終出力をクライアントに戻すことができますか?私も実際に感じていない*ユーザーのフィードバックメカニズムとしてのPythonのロギング - 私は素晴らしい*出力を得ると思われます.HTMLのためのespは価値があるよりも面倒です。 –

答えて

0

総称することを好むだろうもう一度メッセージをプッシュすることができますが、私はすでにインスタンス化されたプッシャーオブジェクトを渡すことができると思いますリアルタイムのステータスを取得するために管理されているのは、単にSQL書き込み/ API呼び出しをタスク自体に入れることです。カスタムタスククラスを書くだけで済むので、タスクの戻り値を使って作業するのははるかに簡単です。

これはDjangoを使ってどのように動作するのかよく分かりませんが、このようになります。

class CustomTask(celery.Task): 
    def __call__(self, *args, **kwargs): 
     self.start_time = time.time() 

    def on_success(self, retval, task_id, args, kwargs): 
     do_success_stuff() 

    def on_failure(self, exc, task_id, args, kwargs, einfo): 
     do_failure_stuff() 

@shared_task(base=CustomTask) 
def do_stuff(): 
    return create_widgets() 

完全なリストはここで見つけることができます: http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers

+0

これらのウィジェットはページ参照なしでUIにどのように表示されるのですか? –

+0

私はどこか優雅なソリューションを提供していると確信していますが、私は自分のタスクをテーブルに書き込んだ後、ステータスカラムを更新します。あなたが仕事を開始するとすぐにtask_idを持っているので、新しいステータスを取得するためにいくつかのjqueryのマジックを行うことができます。たぶんhttp://www.giantflyingsaucer.com/blog/?p=4310のようなもの – lpiner

0

[OK]をので、以下では、私は今のところ、それを解決してきた方法のための擬似コードです。基本的に私はhttps://pusher.com/docs/javascript_quick_startを使用し、サーバー側はインスタンス化されたオブジェクトをcompute_moduleに渡します。 1つの欠点は、プッシャーのメッセージがエフェメラルなので、私は別の仕事をしなければならないということです。LogPusherもう1日は何かをDBに保存する...

私の実際の実装では、小タスクが非常に高速に完了したため、接続が確立されていないためにプッシャーメッセージが表示されないため、$.post() ajaxコールで$(document).ready()を呼び出します(履歴メッセージの問題に戻る)。

私は上記のなかったもう一つの代替ルートは、あなたのケースで進捗レポートの場所であるものhttps://channels.readthedocs.io/en/latest/

# views.py 
from tasks import run_task 

def view_task(): 
    run_task.delay('event') 
    return render(request, 'template.html', 'pusher_event':'event') 


# tasks.py 
import pusher 
from django.conf import settings 
from compute_module import compute_fct 

class LogPusher(object): 
    def __init__(self, event): 
     self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID, 
         key=settings.PUSHER_KEY, 
         secret=settings.PUSHER_SECRET, 
         cluster=settings.PUSHER_CLUSTER, ssl=True) 
     self.event = event 

    def send(self, data): 
     self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data)) 

@shared_task 
def run_task(pusher_event): 

    log_pusher = LogPusher(pusher_event) 
    result = compute_fct(log_pusher) 

    # how to catch status update messages from compute_module while compute_fct is running?? 

    if result == 'error': 
      log_pusher.send('status':'error') 
    else: 
      log_pusher.send('status':'success') 


# compute_module.py 
import pandas as pd 

def compute_fct(log_pusher): 
    # send message: status = loading file 
    log_pusher.send('status':'loading file') 
    df = pd.read_csv('test.csv') 
    # send message: status = computing 
    log_pusher.send('status':'computing') 
    val = df['col'].mean() 

    if val is None: 
     return {'status':'error'} 
    else: 
     return {'status':'success','val':val} 


# context_processors.py 
# see https://stackoverflow.com/questions/433162/can-i-access-constants-in-settings-py-from-templates-in-django 
from django.conf import settings 

def pusher(request): 
    return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL } 


# template.html 
<script> 

var pusher = new Pusher("{{PUSHER_KEY}}", { 
    cluster: "{{PUSHER_CLUSTER}}", 
    encrypted: true  
}); 

var channel = pusher.subscribe("{{PUSHER_CHANNEL}}"); 
channel.bind("{{pusher_event}}", function(data) { 
    // process data 
}); 

</script> 
関連する問題