待ち

2017-01-04 12 views
1

は、私は2つの非同期発電機を持っていると言う:待ち

async def get_rules(): 
    while True: 
     yield 'rule=1' 
     asyncio.sleep(2) 


async def get_snapshots(): 
    while True: 
     yield 'snapshot=1' 
     asyncio.sleep(5) 

私は両方からの最新の値で、2つのタプルを返し、単一の非同期発電機にそれらをマージしたいです。並べ替え:combineLatest

これを行うにはどのような方法が最適ですか?

+0

あなたは明確にすることはできますか?それは、両方のサブジェネレータが行うとき、またはどちらかがどちらのときにのみ得られますか? – Blckknght

+0

@Blckknghtどちらかがそうであるとき。私がasyncioについてもっと学ぶほど、私はこれがうまくいくはずと確信が薄くなります。私はasyncioが私に仕事を使いたいと思っています。そして、どういうわけかこれらの関数の結果を待ち行列や並べ替えのチャンネルで伝えています。 – miracle2k

答えて

0

私はこの思い付いた:私はaiochannelが、通常asyncioを使用しています

{'rules': 'rule-1'} 
{'rules': 'rule-1', 'snap': 'snapshot-1'} 
{'rules': 'rule-1', 'snap': 'snapshot-1'} 
.... 

async for item in combine_latest(rules=rulesgen, snap=snapgen): 
    print(item) 

出力のようになります。そうは次のように

async def combine(**generators): 
    """Given a bunch of async generators, merges the events from 
    all of them. Each should have a name, i.e. `foo=gen, bar=gen`. 
    """ 
    combined = Channel() 
    async def listen_and_forward(name, generator): 
     async for value in generator: 
      await combined.put({name: value}) 
    for name, generator in generators.items(): 
     asyncio.Task(listen_and_forward(name, generator)) 

    async for item in combined: 
     yield item 


async def combine_latest(**generators): 
    """Like "combine", but always includes the latest value from 
    every generator. 
    """ 
    current = {} 
    async for value in combine(**generators): 
     current.update(value) 
     yield current 

はそれを呼び出します.Queueもうまくいくはずです。

4

あなたは特にstream.mergestream.accumulateaiostreamを見てすることがあります:

import asyncio 
from itertools import count 
from aiostream import stream 


async def get_rules(): 
    for x in count(): 
     await asyncio.sleep(2) 
     yield 'rule', x 


async def get_snapshots(): 
    for x in count(): 
     await asyncio.sleep(5) 
     yield 'snapshot', x 


async def main(): 
    xs = stream.merge(get_rules(), get_snapshots()) 
    ys = stream.map(xs, lambda x: {x[0]: x[1]}) 
    zs = stream.accumulate(ys, lambda x, e: {**x, **e}, {}) 

    async with zs.stream() as streamer: 
     async for z in streamer: 
      print(z) 


loop = asyncio.get_event_loop() 
loop.run_until_complete(main()) 
loop.close() 

出力:あなたが生成する複合発電をしたいとき

{} 
{'rule': 0} 
{'rule': 1} 
{'rule': 1, 'snapshot': 0} 
{'rule': 2, 'snapshot': 0} 
[...]