サブスクリプションのメカニズムはコールバックの世界であり、asyncio.Futureはコールバックベースのコルーチンベースの世界の橋のようなものです。何かが起こったのを待つことができます。メッセージが来るときにはset_result
を使用し、コルーチン内でメッセージを受信する場合はawait
を使用してください。例えば
:
import asyncio
from random import randint
from collections import defaultdict
from contextlib import suppress
# MSG SUBSCRIBE/SEND API:
_futures = defaultdict(asyncio.Future)
async def msg_for_target(target):
"""Await for this coroutine to recieve msg."""
return await _futures[target]
def send_msg(target, msg):
_futures[target].set_result(msg)
del _futures[target] # We would need new Future for future subscribers
# TEST:
async def random_sender():
"""Send random messages."""
i = 0
while True:
i += 1
await asyncio.sleep(0.5)
target = randint(0, 3)
msg = f'msg {i}'
print(f'Sending msg "{msg}" for target "{target}" ...')
send_msg(target, msg)
async def main():
task = asyncio.ensure_future(random_sender())
for target in (2, 0, 3, 1):
print(f'> Subscribed for target "{target}"')
msg = await msg_for_target(target)
print(f'> Recieved "{msg}"')
print()
# Cleanup, see https://stackoverflow.com/a/43810272/1113207
task.cancel()
with suppress(asyncio.CancelledError):
await task
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
出力:
> Subscribed for target "2"
Sending msg "msg 1" for target "2" ...
> Recieved "msg 1"
> Subscribed for target "0"
Sending msg "msg 2" for target "3" ...
Sending msg "msg 3" for target "0" ...
> Recieved "msg 3"
> Subscribed for target "3"
Sending msg "msg 4" for target "2" ...
Sending msg "msg 5" for target "2" ...
Sending msg "msg 6" for target "2" ...
Sending msg "msg 7" for target "1" ...
Sending msg "msg 8" for target "3" ...
> Recieved "msg 8"
> Subscribed for target "1"
Sending msg "msg 9" for target "0" ...
Sending msg "msg 10" for target "0" ...
Sending msg "msg 11" for target "2" ...
Sending msg "msg 12" for target "2" ...
Sending msg "msg 13" for target "1" ...
> Recieved "msg 13"