2016-06-19 3 views
0

ブロック入出力を使用して大量のデータを取得するさまざまな関数があり、ストリーム/ファイルライクなオブジェクト(チャンクなど)を書くことができます。私はこのデータをクライアントに提供するために竜巻HTTPサーバーを持っています。私がメモリにすべてのデータを保存することができない限り、私はソースからそれを受け取っているので、クライアントにストリームする必要があります。だから私はそのようなことを書いた:Python Tornado HTTPサーバーでクライアントに関数をブロックした結果

import logging 

logging.basicConfig(level=logging.DEBUG) 

from concurrent.futures import ThreadPoolExecutor 
from tornado import gen, httpserver, httpclient, web, ioloop, httputil, escape, locks, iostream 
from threading import Event 

def get_data(stream): 
    with open('/tmp/qq.dat') as file: 
     for chunk in iter(lambda: file.read(64*1024), b''): 
      stream.write(chunk) 

class ProxyStream(object): 
    def __init__(self, request): 
     self._request = request 

    def write(self, data): 
     self._request.write(data) 
     event = Event() 
     self._request.flush(callback=lambda: event.set()) 
     event.wait() 
     return len(data) 

class Test(web.RequestHandler): 
    def initialize(self, workers): 
     self._workers = workers 

    @gen.coroutine 
    def get(self): 
     stream = ProxyStream(self) 
     yield self._workers.submit(get_data, stream) 
     logging.debug("GET done") 
     self.finish() 


if __name__ == '__main__': 
    workers = ThreadPoolExecutor(4) 
    app = web.Application([ 
     (r"/test", Test, {'workers': workers}), 
     ]) 

    server = httpserver.HTTPServer(app)                                     server.bind(1488) 
    server.start(1) 
    ioloop.IOLoop.current().start() 

それは、上記get_data()関数をコーディングし、いくつかのファイルを読み込み(それは非常に大きいかもしれない)と、引数として渡されたストリームにチャンクに書き込みます。ストリームは、受信データをRequestHandlerオブジェクトに書き込むProxyStreamオブジェクトによってエミュレートされ、チャンクがネットワークにフラッシュされるまで待機します。

このコードは予期したとおりに動作するようですが、この方法にはいくつかの落とし穴があるのか​​、それともこの問題を解決する良い方法があるのか​​という疑問があります。

答えて

0

実際、私は私のために良く見える解決に導くいくつかの問題に直面しました。

のRequestHandler write()flush()メソッドは、スレッドセーフではありませんし、(thisthisを参照)ioloopが実行されているスレッドから呼び出されなければなりません。正しい方法はwrite() & flush()IOLoop.add_callbackにラップして、次のioloopイテレーションで呼び出されるようにすることです。 結果のコードは次のようになめらかである:

class ProxyStream(object): 
def __init__(self, handler, headers=[]): 
    self._handler = handler 

def sync_write(self, data, event): 
    self._handler.write(data) 
    self._handler.flush(callback=lambda: event.set()) 

def write(self, data): 
    if not self._handler.dead: 
     event = Event() 
     IOLoop.current().add_callback(self.sync_write, data, event) 
     event.wait() 
     return len(data) 
    else: 
     raise IOError("Client has closed connection") 

RequestHandlerは、クライアントが切断されると、ストリーミングを停止するon_connection_close()self.dead = Trueを設定する必要があり、いくつかの同期コードに自身を通過します)

関連する問題