2017-08-30 3 views
11

aiohttpを使用して、同期コードをasyncioに移動するプロセスにあります。同期コードは実行するのに15分かかりましたので、私はこれを改善したいと考えています。Python aiohttp/asyncio - 返されたデータを処理する方法

いくつかのURLからデータを取得し、それぞれの本体を返す作業コードがあります。しかし、これはちょうど1つの研究室サイトに対してです、私は70以上の実際のサイトを持っています。

したがって、リスト内の700個のURLを処理するすべてのサイトのすべてのURLのリストを作成するためのループがあるとします。今それらを処理すると、私は問題ではないと思いますか?

しかし、結果と「もの」をして、私はどのようにプログラムするのか分からないのですか?返される結果のそれぞれに「もの」を加えるコードはすでにありますが、結果の正しいタイプに対してプログラミングする方法がわかりません。

コードを実行すると、すべてのURLが処理され、実行する時間によっては不明な順序が返されますか?

結果を処理する関数が必要ですか?

import asyncio, aiohttp, ssl 
from bs4 import BeautifulSoup 

def page_content(page): 
    return BeautifulSoup(page, 'html.parser') 


async def fetch(session, url): 
    with aiohttp.Timeout(15, loop=session.loop): 
     async with session.get(url) as response: 
      return page_content(await response.text()) 

async def get_url_data(urls, username, password): 
    tasks = [] 
    # Fetch all responses within one Client session, 
    # keep connection alive for all requests. 
    async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session: 
     for i in urls: 
      task = asyncio.ensure_future(fetch(session, i)) 
      tasks.append(task) 

     responses = await asyncio.gather(*tasks) 
     # you now have all response bodies in this variable 
     for i in responses: 
      print(i.title.text) 
     return responses 


def main(): 
    username = 'monitoring' 
    password = '*********' 
    ip = '10.10.10.2' 
    urls = [ 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.10.0.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.0.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'frontend.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'planner.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.10.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.11.11.1'), 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.12.12.60'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.12.12.60'), 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'lon-dc-01.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'lon-dc-01.domain.com'), 
     ] 
    loop = asyncio.get_event_loop() 
    future = asyncio.ensure_future(get_url_data(urls,username,password)) 
    data = loop.run_until_complete(future) 
    print(data) 

if __name__ == "__main__": 
    main() 

答えて

2

あなたのコードは、マークから遠く離れていません。 asyncio.gatherは引数の順に結果を返しますので、ここでは注文は保存されますが、page_contentは順番に呼び出されません。

いくつかの調整:すべての

まず、あなたはここにensure_futureは必要ありません。タスクを作成することは、コルーチンが親より長生きしようとしている場合、つまりタスクを作成した関数が実行されていてもタスクを実行し続ける必要がある場合にのみ必要です。ここでは、代わりにあなたのコルーチンと直接asyncio.gatherを呼び出している必要なもの:

async def get_url_data(urls, username, password): 
    async with aiohttp.ClientSession(...) as session: 
     responses = await asyncio.gather(*(fetch(session, i) for i in urls)) 
    for i in responses: 
     print(i.title.text) 
    return responses 

しかしが、これはすべて同時にフェッチスケジュールしまう呼び出し、およびURLの数が多いと、これが最適からは程遠いです。その代わりに、最大同時実行性を選択して、たいていのXフェッチがいつでも実行されるようにする必要があります。これを実装するにはasyncio.Semaphore(20)を使うことができます。このセマフォは最大20個のコルーチンによってしか取得できないため、他の人はスポットが利用できるようになるまで取得を待つでしょう。

CONCURRENCY = 20 
TIMEOUT = 15 

async def fetch(session, sem, url): 
    async with sem: 
     async with session.get(url) as response: 
      return page_content(await response.text()) 

async def get_url_data(urls, username, password): 
    sem = asyncio.Semaphore(CONCURRENCY) 
    async with aiohttp.ClientSession(...) as session: 
     responses = await asyncio.gather(*(
      asyncio.wait_for(fetch(session, sem, i), TIMEOUT) 
      for i in urls 
     )) 
    for i in responses: 
     print(i.title.text) 
    return responses 

このように、すべてのフェッチはすぐに開始されますが、そのうち20個だけがセマフォを取得できます。他の命令は最初のasync with命令でブロックし、別のフェッチが完了するまで待ちます。

私はaiohttp.Timeoutを公式のasyncioに置き換えました。

最後に、データの実際の処理では、CPU時間によって制限される場合、asyncioはあまり役に立ちません。実際の作業を別のCPUに並列化するには、ProcessPoolExecutorをここで使用する必要があります。 run_in_executorがおそらくこれに使用されます。

+0

ありがとう、私はあなたが言ったことのすべてを理解していますが、あなたはProcessPoolExecutorの部分で私を失ってしまいました。私は結果を別のCPUプロセスを持つ必要がありますか?どうすればいいですか?どのようにそれらを順番に処理するのですか、またはどのタイプに関係なくすべての結果を処理する関数が必要ですか? – AlexW

2

concurrent.futures.ProcessPoolExecutorの例を示します。 max_workersを指定せずに作成された場合、実装では代わりにos.cpu_countが使用されます。また、asyncio.wrap_futureは公開されていますが、文書化されていません。あるいは、AbstractEventLoop.run_in_executorがあります。

import asyncio 
from concurrent.futures import ProcessPoolExecutor 

import aiohttp 
import lxml.html 


def process_page(html): 
    '''Meant for CPU-bound workload''' 
    tree = lxml.html.fromstring(html) 
    return tree.find('.//title').text 


async def fetch_page(url, session): 
    '''Meant for IO-bound workload''' 
    async with session.get(url, timeout = 15) as res: 
     return await res.text() 


async def process(url, session, pool): 
    html = await fetch_page(url, session) 
    return await asyncio.wrap_future(pool.submit(process_page, html)) 


async def dispatch(urls): 
    pool = ProcessPoolExecutor() 
    async with aiohttp.ClientSession() as session: 
     coros = (process(url, session, pool) for url in urls) 
     return await asyncio.gather(*coros) 


def main(): 
    urls = [ 
     'https://stackoverflow.com/', 
     'https://serverfault.com/', 
     'https://askubuntu.com/', 
     'https://unix.stackexchange.com/' 
    ] 
    result = asyncio.get_event_loop().run_until_complete(dispatch(urls)) 
    print(result) 

if __name__ == '__main__': 
    main() 
関連する問題