2017-01-17 4 views
1
import falcon 
import json 
from tasks import add 
from waitress import serve 


class tasksresource: 
    def on_get(self, req, resp): 
     """Handles GET requests""" 
     self.result = add.delay(1, 2) 
     self.context = {'ID': self.result.id, 'final result': self.result.ready()} 
     resp.body = json.dumps(self.context) 



api = falcon.API() 
api.add_route('/result', tasksresource()) 
# api.add_route('/result/task', taskresult()) 
if __name__ == '__main__': 
    serve(api, host='127.1.0.1', port=5555) 

行うどのように私を取得JSONペイロードからタスクIDを取得します(ポストデータ) とそれここファルコンPythonの例

+0

私が理解したように、タスクを開始した後、別のルートにタスクIDを送信したいですか?そうですか? –

+0

はい、彼らは私が理解していない用語プーリングです –

答えて

3

小さな例へのルートを追加します。ファイルの構造:

/project 
     __init__.py 
     app.py # routes, falcon etc. 
     tasks.py # celery 
example.py # script for demonstration how it works 

app.py:

import json 

import falcon 
from tasks import add 
from celery.result import AsyncResult 


class StartTask(object): 

    def on_get(self, req, resp): 
     # start task 
     task = add.delay(4, 4) 
     resp.status = falcon.HTTP_200 
     # return task_id to client 
     result = {'task_id': task.id} 
     resp.body = json.dumps(result) 


class TaskStatus(object): 

    def on_get(self, req, resp, task_id): 
     # get result of task by task_id and generate content to client 
     task_result = AsyncResult(task_id) 
     result = {'status': task_result.status, 'result': task_result.result} 
     resp.status = falcon.HTTP_200 
     resp.body = json.dumps(result) 


app = falcon.API() 

# registration of routes 
app.add_route('/start_task', StartTask()) 
app.add_route('/task_status/{task_id}', TaskStatus()) 

tasks.py:

from time import sleep 

import celery 


app = celery.Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0') 


@app.task 
def add(x, y): 
    """ 
    :param int x: 
    :param int y: 
    :return: int 
    """ 
    # sleep just for demonstration 
    sleep(5) 

    return x + y 

は、今、私たちはceleryアプリケーションを起動する必要があります。 projectフォルダに移動し、実行します。

celery -A tasks worker --loglevel=info 

我々はFalconアプリケーションを起動する必要があり、この後。 projectフォルダに移動して実行します。

gunicorn app:app 

すべてが準備ができています。

from time import sleep 

import requests 
# start new task 
task_info = requests.get('http://127.0.0.1:8000/start_task') 
task_info = task_info.json() 

while True: 
    # check status of task by task_id while task is working 
    result = requests.get('http://127.0.0.1:8000/task_status/' + task_info['task_id']) 
    task_status = result.json() 

    print task_status 

    if task_status['status'] == 'SUCCESS' and task_status['result']: 
     print 'Task with id = %s is finished' % task_info['task_id'] 
     print 'Result: %s' % task_status['result'] 
     break 
    # sleep and check status one more time 
    sleep(1) 

だけpython ./example.pyを呼び出して、あなたはこのような何かが表示されます:これはあなたを助け

{u'status': u'PENDING', u'result': None} 
{u'status': u'PENDING', u'result': None} 
{u'status': u'PENDING', u'result': None} 
{u'status': u'PENDING', u'result': None} 
{u'status': u'PENDING', u'result': None} 
{u'status': u'SUCCESS', u'result': 8} 
Task with id = 76542904-6c22-4536-99d9-87efd66d9fe7 is finished 
Result: 8 

希望

example.pyは理解するのを助けることができる小さなクライアント側です。

+0

あなたはすばらしいバディですDanila Ganchar –

+0

@DevJalla開発に幸運を祈る! –

+0

私の次のタスクは、 " requests.get( 'http://127.0.0.1:8000/start_task')ユーザが、java oauthサーバで検証しなければならないトークンを渡さなければならないという、incommingリクエストでoauthを検証することです。例またはあなたが助けることができる –