大きなファイルでは、別のスレッドでファイル全体を読み書きすることができないため、別の答えがあります。 1つのチャンクで大容量のファイルの内容をすべて受信または送信することはできません。メモリが不足している可能性があるからです。
私にとっては、ioloopのメインスレッドのチャンクプロセッサーが速度に追いつくことができないときに、リーダー/ライタースレッドをブロックする方法を見つけることは自明ではありませんでした。以下の実装は、ファイル読み取り操作がチャンクプロセッサーよりもはるかに高速で、ファイル読み取り操作が遅い場合にも効率的に機能します。同期は、非同期キューとロックの組み合わせによって実現され、ioloopのスレッドを決してブロックしません。
ロックはループのスレッド内でのみ解放され、取得されることはなく、競合状態は存在しません。
私はこれが答えとして受け入れられるとは思っていませんが、理解するまでにはしばらく時間がかかりましたので、他の人の実装に役立つと思います。
これは、ファイルの読み書き操作だけでなく、片側が別のスレッドにあり、反対側がioloopにあるコンシューマ/プロデューサのペアに対しても一般化できます。
import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
from tornado.queues import Queue
def read_file(file_path, queue: Queue, io_loop: IOLoop, chunk_size: int = 64 * 1024):
file_size = os.path.getsize(file_path)
remaining = file_size
fin = open(file_path, "rb")
lock = threading.Lock()
def putter(chunk, lock: threading.Lock):
queue.put(chunk) # Called from the loop's thread -> can block
lock.release() # Awake reader thread after the chunk has been put into the processing queue
def put(chunk, lock):
"""Put the chunk into the queue, and wait until it is processed by the ioloop"""
lock.acquire() # Acquire in this thread
io_loop.spawn_callback(putter, chunk, lock) # Release in the loop's thread
lock.acquire() # Wait until the loop's thread has accepted the chunk for processing
lock.release() # Cleanup before return
# Put the file size into the queue without waiting
io_loop.spawn_callback(queue.put, file_size)
while remaining > 0:
chunk = fin.read(min(chunk_size, remaining))
print("read", chunk)
remaining -= len(chunk)
time.sleep(1) # Just for testing: simulate slow file reads.
put(chunk, lock)
# Put EOF/terminator into the queue
io_loop.spawn_callback(queue.put, None)
pool = ThreadPoolExecutor(3)
async def main():
# Create a queue for sending chunks of data
cq = Queue(maxsize=3)
# Start the reader thread that reads in a separate thread
pool.submit(read_file, __file__, cq, io_loop, 100)
file_size = await cq.get()
print("file size:", file_size)
# Process chunks
while True:
item = await cq.get()
# Terminator -> EOF
if item is None:
break
print("got chunk:", repr(item))
io_loop.stop()
if __name__ == '__main__':
io_loop = IOLoop.current()
io_loop.run_sync(main)
io_loop.start()
フラグ** os.O_NONBLOCK **を使用できます。 'fd = os.open(" foo.txt "、os.O_RDWR | os.O_CREAT)'詳細情報[ここ](http://www.tutorialspoint.com/python/os_open.htm)。 –
'asyncio 'を使うことができれば(つまり、Python3.3 +を使っている)、[Tornado asyncio bridge](http://www.tornadoweb.org/en/stable/asyncio)を使ってみることができます。 html)と[aiofiles](https://github.com/Tinche/aiofiles)の組み合わせ –