2017-12-15 4 views
2

私はこれに続くRoute_Guide sampleです。Python gRPCでストリーミングメッセージを処理するにはどうすればいいですか

このサンプルは、特定のメッセージに返信せずにメッセージを読み取り、読み取ります。後者は私が達成しようとしているものです。

が、ここで私がこれまで持っているものです。

import grpc 
... 

channel = grpc.insecure_channel(conn_str) 
try: 
    grpc.channel_ready_future(channel).result(timeout=5) 
except grpc.FutureTimeoutError: 
    sys.exit('Error connecting to server') 
else: 
    stub = MyService_pb2_grpc.MyServiceStub(channel) 
    print('Connected to gRPC server.') 
    this_is_just_read_maybe(stub) 


def this_is_just_read_maybe(stub): 
    responses = stub.MyEventStream(stream()) 
    for response in responses: 
     print(f'Received message: {response}') 
     if response.something: 
      # okay, now what? how do i send a message here? 

def stream(): 
    yield my_start_stream_msg 
    # this is fine, i receive this server-side 
    # but i can't check for incoming messages here 

私はスタブ上read()またはwrite()を持っていないようだ、すべてがイテレータで実装しているようです。

this_is_just_read_maybe(stub)からメッセージを送信するにはどうすればよいですか? これは正しいアプローチですか?

service MyService { 
    rpc MyEventStream (stream StreamingMessage) returns (stream StreamingMessage) {} 
} 

答えて

2

何をしようとしていることは完全に可能であり、彼らが使用するのではなく、到着すると、おそらく応答を与えることができ、独自のリクエストiteratorオブジェクトを書い関与します:

私のプロトは、双方向の流れであります単純なジェネレータをリクエストイテレータとして使用します。おそらく、あなたはその後、

my_smarter_request_iterator = MySmarterRequestIterator() 
responses = stub.MyEventStream(my_smarter_request_iterator) 
for response in responses: 
    my_smarter_request_iterator.add_response(response) 

ように使用

class MySmarterRequestIterator(object): 

    def __init__(self): 
     self._lock = threading.Lock() 
     self._responses_so_far = [] 

    def __iter__(self): 
     return self 

    def _next(self): 
     # some logic that depends upon what responses have been seen 
     # before returning the next request message 
     return <your message value> 

    def __next__(self): # Python 3 
     return self._next() 

    def next(self): # Python 2 
     return self._next() 

    def add_response(self, response): 
     with self._lock: 
      self._responses.append(response) 

のようなもの。あなたの_next実装では、gRPC Pythonが送信したい次の要求と実際に応答していることをオブジェクトに要求する状況を処理するために、おそらくロックとブロックが行われます( "wait、hold on、私はどの要求私は次の応答がどのようになったかを見た後まで送るつもりです。

+0

ありがとうナサニエル、私はそれを試してみます。 Golangの実装のように、PythonのgRPCが '.send()'と '.recv()'プリミティブを公開しないという特別な理由があります。 – evilSnobu

+0

APIが設計された時点では、Pythonのユースケースのための少しの辺のケースとみなされていましたので、最も簡単なこととより一般的なことはもう少しの作業で可能でした。 –

+0

これらのすべてのロックを持つ多くのクライアントに双方向接続を提供すると、この解決策はどれほど拡張性がありますか? – Skulas

関連する問題