2017-12-27 27 views
2

私はSanicと協力していますが、ちょっと残念です。私は、それぞれ独自の応答時間を持つ3種類のAPIを呼び出しています。まだ完了していないPython非同期イベントループへの応答の出力

各タスクが復帰するのに許容される時間を提供するタイムアウト関数を作成します。しかし、時間のタスクが許容時間内に完了していない場合、私は完全なデータセットを必要とせず、速度がより重視されるため、部分的なデータを返したいと思います。

しかし、私が完了(すなわち。APIデータを要求し、PostgresのDBに挿入するまでの作業未完の仕事を続けたい。

我々は、スケジューラのいくつかの種類を使用せずにこれを行うことができます場合、私は思ったんだけど私は完全なデータセットを必要としないと速度 がより多くのであるとして、一部のデータを返すために を。

+0

あなたはリスナー/ Observerパターンを熟知していますか?これは、いつでも復帰できる非同期コールバックで使用されます。 –

+0

いいえ、リンクはありますか? –

+0

「Python Observer Pattern」のグーグル化については、http://www.giantflyingsaucer.com/blog/?p=5117を参照してください。それ以外の場合は、チュートリアルを手伝ってください。 –

答えて

1

をバックグラウンド内で実行しているタスクを維持しかし、時間のタスクが許容時間内に完了しない場合、私はしたいですフォーカス。

しかし、私は完了まで未完成の仕事を続けたい

他のタスクはタイムアウトタスクの状態とは関係ありません、そうですか?私があなたを正しく理解していれば、自分のタイムアウトで3 asyncio.Taskを実行し、その結果を最終的に集計したいだけです。

状況が整理された方法によって大きく異なる可能性があるので、「部分的なデータを返す」問題が発生する可能性があります。ここで

は少しのプロトタイプです:

import asyncio 


class PartialData(Exception): 
    def __init__(self, data): 
     super().__init__() 
     self.data = data   


async def api_job(i): 
    data = 'job {i}:'.format(i=i) 
    try: 
     await asyncio.sleep(1) 
     data += ' step 1,' 
     await asyncio.sleep(2) 
     data += ' step 2,' 
     await asyncio.sleep(2) 
     data += ' step 3.' 
    except asyncio.CancelledError as exc: 
     raise PartialData(data) # Pass partial data to outer code with our exception. 
    else: 
     return data 


async def api_task(i, timeout): 
    """Wrapper for api_job to run it with timeout and retrieve it's partial data on timeout.""" 
    t = asyncio.ensure_future(api_job(i)) 
    try: 
     await asyncio.wait_for(t, timeout) 
    except asyncio.TimeoutError: 
     try: 
      await t 
     except PartialData as exc: 
      return exc.data # retrieve partial data on timeout and return it. 
    else: 
     return t.result() 


async def main(): 
    # Run 3 jobs with different timeouts: 
    results = await asyncio.gather(
     api_task(1, timeout=2), 
     api_task(2, timeout=4), 
     api_task(3, timeout=6), 
    ) 

    # Print results including "partial data": 
    for res in results: 
     print(res) 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    try: 
     loop.run_until_complete(main()) 
    finally: 
     loop.run_until_complete(loop.shutdown_asyncgens()) 
     loop.close() 

出力:

job 1: step 1, 
job 2: step 1, step 2, 
job 3: step 1, step 2, step 3. 

(あなたが最初の2つのジョブがタイムアウトし、検索自分の件のデータの一部のみを終え見ることができるように)

アップデート:

複雑な例では、様々なイベントへの可能な解決策が含まれています。

import asyncio 
from contextlib import suppress 


async def stock1(_): 
    await asyncio.sleep(1) 
    return 'stock1 res' 

async def stock2(exception_in_2): 
    await asyncio.sleep(1) 
    if exception_in_2: 
     raise ValueError('Exception in stock2!') 
    await asyncio.sleep(1) 
    return 'stock2 res' 

async def stock3(_): 
    await asyncio.sleep(3) 
    return 'stock3 res' 


async def main(): 
    # Vary this values to see different situations: 
    timeout = 2.5 
    exception_in_2 = False 


    # To run all three stocks just create tasks for them: 
    tasks = [ 
     asyncio.ensure_future(s(exception_in_2)) 
     for s 
     in (stock1, stock2, stock3) 
    ] 


    # Now we just wait until one of this possible situations happened: 
    # 1) Everything done 
    # 2) Exception occured in one of tasks 
    # 3) Timeout occured and at least two tasks ready 
    # 4) Timeout occured and less than two tasks ready 
    # (https://docs.python.org/3/library/asyncio-task.html#asyncio.wait) 
    await asyncio.wait(
     tasks, 
     timeout=timeout, 
     return_when=asyncio.FIRST_EXCEPTION 
    ) 

    is_success = all(t.done() and not t.exception() for t in tasks) 
    is_exception = any(t.done() and t.exception() for t in tasks) 
    is_good_timeout = \ 
     not is_success and \ 
     not is_exception and \ 
     sum(t.done() for t in tasks) >= 2 
    is_bad_timeout = \ 
     not is_success and \ 
     not is_exception and \ 
     sum(t.done() for t in tasks) < 2 


    # If success, just print all results: 
    if is_success: 
     print('All done before timeout:') 
     for t in tasks: 
      print(t.result()) 
    # If timeout, but at least two done, 
    # print it leaving pending task to be executing. 
    # But note two important things: 
    # 1) You should guarantee pending task done before loop closed 
    # 2) What if pending task will finish with error, is it ok? 
    elif is_good_timeout: 
     print('Timeout, but enought tasks done:') 
     for t in tasks: 
      if t.done(): 
       print(t.result()) 
    # Timeout and not enought tasks done, 
    # let's just cancel all hanging:  
    elif is_bad_timeout: 
     await cancel_and_retrieve(tasks) 
     raise RuntimeError('Timeout and not enought tasks done') # You probably want indicate fail 
    # If any of tasks is finished with an exception, 
    # we should probably cancel unfinished tasks, 
    # await all tasks done and retrive all exceptions to prevent warnings 
    # (https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed) 
    elif is_exception: 
     await cancel_and_retrieve(tasks) 
     raise RuntimeError('Exception in one of tasks') # You probably want indicate fail 


async def cancel_and_retrieve(tasks): 
    """ 
    Cancel all pending tasks, retrieve all exceptions 
    (https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed) 
    It's cleanup function if we don't want task being continued. 
    """ 
    for t in tasks: 
     if not t.done(): 
      t.cancel() 
    await asyncio.wait(
     tasks, 
     return_when=asyncio.ALL_COMPLETED 
    ) 
    for t in tasks: 
     with suppress(Exception): 
      await t 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    try: 
     loop.run_until_complete(main()) 
    finally: 

     # If some tasks still pending (is_good_timeout case), 
     # let's kill them: 
     loop.run_until_complete(
      cancel_and_retrieve(asyncio.Task.all_tasks()) 
     ) 

     loop.run_until_complete(loop.shutdown_asyncgens()) 
     loop.close() 
+0

このロジックを1秒間使用します。在庫を表示しています。 3つの株式があります。あなたは株式の価格を要求しますが、株式2は時間がかかります。待つのではなく、株価1と3を表示するだけです。 –

+0

StandardCitizen株2が1または3の前に到着すれば、私たちは何をしますか? –

+0

あなたの返事をありがとう、許可された時間内に2つの到着を言うことができ、データベースに入力します。 3番手は長すぎます。したがって、リクエストは返されますが、3番目のプロセスは開いたままになり、完了するまでジョブを実行します。うまくいけば、ユーザーのリフレッシュ時に、それが行われます。これらの株式は1日に1回しか開かないと言います。 –

関連する問題