2016-07-15 25 views
3

GRPCリクエストごとにセロリタスクを呼び出し、その結果を返す必要があります。 デフォルトのGRPC実装では、各要求はスレッドプールとは別のスレッドで処理されます。どのように非同期grpcのPythonサーバーを実装するには?

私の場合、サーバーは1秒あたり〜400リクエストをバッチモードで処理することになっています。そのため、1回のリクエストでバッチ処理のために1秒間待たなければならない場合があります。つまり、ブロックするのを防ぐためにスレッドプールのサイズを400より大きくする必要があります。

これは非同期で行うことができますか? ありがとうございました。

class EventReporting(ss_pb2.BetaEventReportingServicer, ss_pb2.BetaDeviceMgtServicer): 
    def ReportEvent(self, request, context): 
    res = tasks.add.delay(1,2) 
    result = res.get() ->here i have to block 
    return ss_pb2.GeneralReply(message='Hello, %s!' % result.message) 

答えて

3

(それはasyncキーワードで定義されている場合)res.getへのお電話は、非同期的に行うことができるかどうかは非同期に行うことができます。

While grpc.server says it requires a futures.ThreadPoolExecutor, it will actually work with any futures.Executor that calls the behaviors submitted to it on some thread other than the one on which they were passed。あなたが実装したgrpc.serverfutures.Executorを渡していたのですが、1つのスレッドだけを使用してEventReporting.ReportEventに400回以上の同時呼び出しを行うと、サーバーはあなたが記述する種類のブロックを回避する必要があります。

1

私の意見では、aiohttpをベースにしたhttpのような、単純な非同期grpcサーバーの実装がいいです。元に

import asyncio 
from concurrent import futures 
import functools 
import inspect 
import threading 

from grpc import _server 

def _loop_mgr(loop: asyncio.AbstractEventLoop): 

    asyncio.set_event_loop(loop) 
    loop.run_forever() 

    # If we reach here, the loop was stopped. 
    # We should gather any remaining tasks and finish them. 
    pending = asyncio.Task.all_tasks(loop=loop) 
    if pending: 
     loop.run_until_complete(asyncio.gather(*pending)) 


class AsyncioExecutor(futures.Executor): 

    def __init__(self, *, loop=None): 

     super().__init__() 
     self._shutdown = False 
     self._loop = loop or asyncio.get_event_loop() 
     self._thread = threading.Thread(target=_loop_mgr, args=(self._loop,), 
             daemon=True) 
     self._thread.start() 

    def submit(self, fn, *args, **kwargs): 

     if self._shutdown: 
      raise RuntimeError('Cannot schedule new futures after shutdown') 

     if not self._loop.is_running(): 
      raise RuntimeError("Loop must be started before any function can " 
           "be submitted") 

     if inspect.iscoroutinefunction(fn): 
      coro = fn(*args, **kwargs) 
      return asyncio.run_coroutine_threadsafe(coro, self._loop) 

     else: 
      func = functools.partial(fn, *args, **kwargs) 
      return self._loop.run_in_executor(None, func) 

    def shutdown(self, wait=True): 
     self._loop.stop() 
     self._shutdown = True 
     if wait: 
      self._thread.join() 


# --------------------------------------------------------------------------- # 


async def _call_behavior(rpc_event, state, behavior, argument, request_deserializer): 
    context = _server._Context(rpc_event, state, request_deserializer) 
    try: 
     return await behavior(argument, context), True 
    except Exception as e: # pylint: disable=broad-except 
     with state.condition: 
      if e not in state.rpc_errors: 
       details = 'Exception calling application: {}'.format(e) 
       _server.logging.exception(details) 
       _server._abort(state, rpc_event.operation_call, 
         _server.cygrpc.StatusCode.unknown, _server._common.encode(details)) 
     return None, False 

async def _take_response_from_response_iterator(rpc_event, state, response_iterator): 
    try: 
     return await response_iterator.__anext__(), True 
    except StopAsyncIteration: 
     return None, True 
    except Exception as e: # pylint: disable=broad-except 
     with state.condition: 
      if e not in state.rpc_errors: 
       details = 'Exception iterating responses: {}'.format(e) 
       _server.logging.exception(details) 
       _server._abort(state, rpc_event.operation_call, 
         _server.cygrpc.StatusCode.unknown, _server._common.encode(details)) 
     return None, False 

async def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk, 
            request_deserializer, response_serializer): 
    argument = argument_thunk() 
    if argument is not None: 
     response, proceed = await _call_behavior(rpc_event, state, behavior, 
               argument, request_deserializer) 
     if proceed: 
      serialized_response = _server._serialize_response(
       rpc_event, state, response, response_serializer) 
      if serialized_response is not None: 
       _server._status(rpc_event, state, serialized_response) 

async def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk, 
            request_deserializer, response_serializer): 
    argument = argument_thunk() 
    if argument is not None: 
     # Notice this calls the normal `_call_behavior` not the awaitable version. 
     response_iterator, proceed = _server._call_behavior(
      rpc_event, state, behavior, argument, request_deserializer) 
     if proceed: 
      while True: 
       response, proceed = await _take_response_from_response_iterator(
        rpc_event, state, response_iterator) 
       if proceed: 
        if response is None: 
         _server._status(rpc_event, state, None) 
         break 
        else: 
         serialized_response = _server._serialize_response(
          rpc_event, state, response, response_serializer) 
         print(response) 
         if serialized_response is not None: 
          print("Serialized Correctly") 
          proceed = _server._send_response(rpc_event, state, 
                serialized_response) 
          if not proceed: 
           break 
         else: 
          break 
       else: 
        break 

_server._unary_response_in_pool = _unary_response_in_pool 
_server._stream_response_in_pool = _stream_response_in_pool 


if __name__ == '__main__': 
    server = grpc.server(AsyncioExecutor()) 
    # Add Servicer and Start Server Here 

リンク:
https://gist.github.com/seglberg/0b4487b57b4fd425c56ad72aba9971be

+0

私はコメントを修正しました。これは良い実装だと思うので、否定的な評価を削除してください – Vetos

関連する問題