2016-08-07 15 views
1

私はしばらくの間、Tornadoを使用していましたが、遅いタイミングで問題が発生しました(これはthis questionで尋ねました)。仲間のユーザーから指摘された問題の1つは、コルーチン内でファイルに書き込むのに普通のopen("..." , 'w')を使用していて、これがブロックされている可能性があるということでした。竜巻では、非ブロックファイルの読み書き方法は?

私の質問は、トルネードで非ブロックファイルIOを行う方法はありますか?自分のニーズに合った研究を見つけることができませんでした。

+0

フラグ** os.O_NONBLOCK **を使用できます。 'fd = os.open(" foo.txt "、os.O_RDWR | os.O_CREAT)'詳細情報[ここ](http://www.tutorialspoint.com/python/os_open.htm)。 –

+0

'asyncio 'を使うことができれば(つまり、Python3.3 +を使っている)、[Tornado asyncio bridge](http://www.tornadoweb.org/en/stable/asyncio)を使ってみることができます。 html)と[aiofiles](https://github.com/Tinche/aiofiles)の組み合わせ –

答えて

1

ファイルIOに関連付けられたすべてのコードを、run_on_executorで装飾された別の機能に移動します。

import os 
import io 
from concurrent.futures import ThreadPoolExecutor 
from PIL import Image 

class UploadHandler(web.RequestHandler): 
    executor = ThreadPoolExecutor(max_workers=os.cpu_count()) 

    @gen.coroutine 
    def post(self): 
     file = self.request.files['file'][0] 
     try: 
      thumbnail = yield self.make_thumbnail(file.body) 
     except OSError: 
      raise web.HTTPError(400, 'Cannot identify image file') 
     orig_id, thumb_id = yield [ 
      gridfs.put(file.body, content_type=file.content_type), 
      gridfs.put(thumbnail, content_type='image/png')] 
     yield db.imgs.save({'orig': orig_id, 'thumb': thumb_id}) 
     self.redirect('') 

    @run_on_executor 
    def make_thumbnail(self, content): 
     im = Image.open(io.BytesIO(content)) 
     im.convert('RGB') 
     im.thumbnail((128, 128), Image.ANTIALIAS) 
     with io.BytesIO() as output: 
      im.save(output, 'PNG') 
      return output.getvalue() 
+0

これはioloopを実行するメインスレッドをブロックしませんが、最初はioloopを使用する目的に反します。技術的には、これは小さいサムネイルといくつかのハンドラを同時に生成するために機能します。しかし、これは一般的な解決策ではありません。たとえば、膨大なファイルを数百から数千のリクエストに対して同時にストリーミングすることはできません。 – nagylzs

+0

@nagylzsあなたはブロックすることなく**書き込み**ファイルに別のソリューションを提供できますか? –

+0

残念ながら。 :-(ポータブルではないので、Linuxはカーネルレベルで非同期ファイル操作をサポートしていません。あなたの解決策は今のところ最高です。https://groups.google.com/forum/m/#!topic/python-tornado/ Ycj0_r4j0VQ – nagylzs

0

大きなファイルでは、別のスレッドでファイル全体を読み書きすることができないため、別の答えがあります。 1つのチャンクで大容量のファイルの内容をすべて受信または送信することはできません。メモリが不足している可能性があるからです。

私にとっては、ioloopのメインスレッドのチャンクプロセッサーが速度に追いつくことができないときに、リーダー/ライタースレッドをブロックする方法を見つけることは自明ではありませんでした。以下の実装は、ファイル読み取り操作がチャンクプロセッサーよりもはるかに高速で、ファイル読み取り操作が遅い場合にも効率的に機能します。同期は、非同期キューとロックの組み合わせによって実現され、ioloopのスレッドを決してブロックしません。

ロックはループのスレッド内でのみ解放され、取得されることはなく、競合状態は存在しません。

私はこれが答えとして受け入れられるとは思っていませんが、理解するまでにはしばらく時間がかかりましたので、他の人の実装に役立つと思います。

これは、ファイルの読み書き操作だけでなく、片側が別のスレッドにあり、反対側がioloopにあるコンシューマ/プロデューサのペアに対しても一般化できます。

import os 
import time 
import threading 
from concurrent.futures import ThreadPoolExecutor 
from tornado.ioloop import IOLoop 
from tornado.queues import Queue 


def read_file(file_path, queue: Queue, io_loop: IOLoop, chunk_size: int = 64 * 1024): 
    file_size = os.path.getsize(file_path) 
    remaining = file_size 
    fin = open(file_path, "rb") 
    lock = threading.Lock() 

    def putter(chunk, lock: threading.Lock): 
     queue.put(chunk)  # Called from the loop's thread -> can block 
     lock.release()   # Awake reader thread after the chunk has been put into the processing queue 

    def put(chunk, lock): 
     """Put the chunk into the queue, and wait until it is processed by the ioloop""" 
     lock.acquire() # Acquire in this thread 
     io_loop.spawn_callback(putter, chunk, lock) # Release in the loop's thread 
     lock.acquire() # Wait until the loop's thread has accepted the chunk for processing 
     lock.release() # Cleanup before return 

    # Put the file size into the queue without waiting 
    io_loop.spawn_callback(queue.put, file_size) 

    while remaining > 0: 
     chunk = fin.read(min(chunk_size, remaining)) 
     print("read", chunk) 
     remaining -= len(chunk) 
     time.sleep(1) # Just for testing: simulate slow file reads. 
     put(chunk, lock) 

    # Put EOF/terminator into the queue 
    io_loop.spawn_callback(queue.put, None) 


pool = ThreadPoolExecutor(3) 


async def main(): 
    # Create a queue for sending chunks of data 
    cq = Queue(maxsize=3) 
    # Start the reader thread that reads in a separate thread 
    pool.submit(read_file, __file__, cq, io_loop, 100) 
    file_size = await cq.get() 
    print("file size:", file_size) 
    # Process chunks 
    while True: 
     item = await cq.get() 
     # Terminator -> EOF 
     if item is None: 
      break 
     print("got chunk:", repr(item)) 

    io_loop.stop() 


if __name__ == '__main__': 
    io_loop = IOLoop.current() 
    io_loop.run_sync(main) 
    io_loop.start() 
関連する問題