2015-11-04 7 views
5

私の仕事は、指定されたURLリストから1M以上の画像をダウンロードすることです。そうするための推奨される方法は何ですか?geventで画像をダウンロードする

Greenlet Vs. Threadsを読んだ後、私はgeventを調べましたが、確実に実行することができません。私は100個のURLのテストセットで遊んでいましたが、1.5秒で終了することもありましたが、リクエストごとのタイムアウト*が0.1であるので奇妙な30秒以上かかることがあります。

*コード

に以下を参照してください、私はまたgrequestsに見えたが、彼らはissues with exception handling.

私の要件は、「私は

  • は(タイムアウトのダウンロード中に発生するエラーを調べることができます持っているようです、壊れた画像...)、
  • 処理された画像の数の進行を監視し、
  • はできるだけ速い。
from gevent import monkey; monkey.patch_all() 
from time import time 
import requests 
from PIL import Image 
import cStringIO 
import gevent.hub 
POOL_SIZE = 300 


def download_image_wrapper(task): 
    return download_image(task[0], task[1]) 

def download_image(image_url, download_path): 
    raw_binary_request = requests.get(image_url, timeout=0.1).content 
    image = Image.open(cStringIO.StringIO(raw_binary_request)) 
    image.save(download_path) 

def download_images_gevent_spawn(list_of_image_urls, base_folder): 
    download_paths = ['/'.join([base_folder, url.split('/')[-1]]) 
         for url in list_of_image_urls] 
    parameters = [[image_url, download_path] for image_url, download_path in 
      zip(list_of_image_urls, download_paths)] 
    tasks = [gevent.spawn(download_image_wrapper, parameter_tuple) for parameter_tuple in parameters] 
    for task in tasks: 
     try: 
      task.get() 
     except Exception: 
      print 'x', 
      continue 
     print '.', 

test_urls = # list of 100 urls 

t1 = time() 
download_images_gevent_spawn(test_urls, 'download_temp') 
print time() - t1 
+2

スレッドを使用する必要がありますか?代わりに複数のプロセスを使用できる場合は、 'multiprocessing.Pool'でこれを行うことができます。私は 'pool.map(download_image、url_list)'と 'pool.join()'を使って同様のことをします。 – foz

+1

@foz、ありがとうございますが、同様の問題で 'multiprocessing.Pool'も試みました。また、私は、 'マルチプロセッシング 'はそのような種類のタスクのための適切なツールではないと言われました:http://stackoverflow.com/a/27016937/380038 – Framester

+0

興味深い!私は、マルチプロセッシングが効率的でスケーラブルではないことを知ることができますが、私はそれが控えめなプールサイズ(あなたが持っていたように32)で動作してはならない理由を見ません。私はあなたも何かを学ぶと思うようにこれに良い答えを得ることを願っています! – foz

答えて

-1

私はそれはpycurlとmulticurlに基づいてasynchronicパーサーですGrablib http://grablib.org/

に注意を払うことをお勧めします。 また、ネットワークエラーを自動的に解決するために試します(タイムアウトなどの場合に再試行してください)。

私はGrab:Spiderモジュールがあなたの問題を99%解決すると信じています。 http://docs.grablib.org/en/latest/index.html#spider-toc

+0

ありがとうございます。あなたは、grablibが何を違うかを精緻化することができますか、なぜ私のアプローチよりもうまくいくのかを知っていますか? – Framester

+0

Ooops、画像のURLを直接入力しますか?はいの場合、ごめんなさい、あなたはまだグラブなどを持っています。 Grablibは、クロールと解析に理想的です。ただし、Grablib(Grab:Spiderモジュール)は、ネットワークエラーが> 400および!= 404のタスクを再試行することもできます。リトライ回数は手動で設定できます。ロギングとプロセス監視があります。 –

1

が、私はこのコードを試してみてくださいhttps://github.com/gevent/gevent/blob/master/examples/concurrent_download.py#L1

の例により、urllib2のに固執する方が良いだろうと思うが、私はそれはあなたが求めているものであると仮定します。

import gevent 
from gevent import monkey 

# patches stdlib (including socket and ssl modules) to cooperate with other greenlets 
monkey.patch_all() 

import sys 

urls = sorted(chloya_files) 

if sys.version_info[0] == 3: 
    from urllib.request import urlopen 
else: 
    from urllib2 import urlopen 


def download_file(url): 
    data = urlopen(url).read() 
    img_name = url.split('/')[-1] 
    with open('c:/temp/img/'+img_name, 'wb') as f: 
     f.write(data) 
    return True 


from time import time 

t1 = time() 
tasks = [gevent.spawn(download_file, url) for url in urls] 
gevent.joinall(tasks, timeout = 12.0) 
print "Sucessful: %s from %s" % (sum(1 if task.value else 0 for task in tasks), len(tasks)) 
print time() - t1 
+0

ありがとう、私は 'urlopen(...、timeout = 0.1)'でそのコードを試しましたが、依然として1000個のURLに対して100秒以上かかるので、リクエストを並行して実行しなかったことを示しています。 – Framester

+0

それはネットワークの問題でしょうか?私のテストでは、いくつかのチェコサイトから139ファイルのために10.1秒かかりました。私は並列性についても疑問を抱いていましたが、今は私がgevent-urlib2ではなくリモートWebサーバーによって制限されていると思います。 – Ingaz

1

HTTPの永続的な接続のためのgeventRequestssimple-requests

使用RequestsSessionを使用して、簡単な解決策があります。 geventRequestsを非同期にしているので、HTTPリクエストではtimeoutは必要ありません。デフォルトでは

、10台のホストと制限、キャッシュされたTCPコネクション(pool_maxsize)あたり10個の同時HTTP要求のrequests.SessionキャッシュのTCPコネクション(pool_connections)。 HTTPアダプタを明示的に作成することで、デフォルト設定を必要に応じて調整する必要があります。

session = requests.Session() 
http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) 
session.mount('http://', http_adapter) 

プロデューサ - コンシューマとしてタスクを中断します。画像ダウンロードはプロデューサタスクであり、画像処理はコンシューマタスクである。

画像処理ライブラリPILが非同期でない場合、プロデューサコルーチンがブロックされる可能性があります。その場合、消費者プールはgevent.threadpool.ThreadPoolになります。 f.e.

from gevent.threadpool import ThreadPool 
consumer = ThreadPool(POOL_SIZE) 

これはどのように行うことができるかの概要です。私はコードをテストしなかった。

from gevent import monkey; monkey.patch_all() 
from time import time 
import requests 
from PIL import Image 
from io import BytesIO 
import os 
from urlparse import urlparse 
from gevent.pool import Pool 

def download(url): 
    try: 
     response = session.get(url) 
    except Exception as e: 
     print(e) 
    else: 
     if response.status_code == requests.codes.ok: 
      file_name = urlparse(url).path.rsplit('/',1)[-1] 
      return (response.content,file_name) 
     response.raise_for_status() 

def process(img): 
    if img is None: 
     return None 
    img, name = img 
    img = Image.open(BytesIO(img)) 
    path = os.path.join(base_folder, name) 
    try: 
     img.save(path) 
    except Exception as e: 
     print(e) 
    else: 
     return True 

def run(urls):   
    consumer.map(process, producer.imap_unordered(download, urls)) 

if __name__ == '__main__': 
     POOL_SIZE = 300 
     producer = Pool(POOL_SIZE) 
     consumer = Pool(POOL_SIZE) 

     session = requests.Session() 
     http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) 
     session.mount('http://', http_adapter) 

     test_urls = # list of 100 urls 
     base_folder = 'download_temp' 
     t1 = time() 
     run(test_urls) 
     print time() - t1 
+0

あなたの提案をありがとう。私はあなたのコードを私のURLで試しましたが、1kのURLのために200を超える時間がかかります。 1つの問題は、ほとんどのドメインが1つのドメインを指していることですが、多くのドメインが異なるドメインを指していることもあります。 – Framester

+0

どのくらい時間がかかると思いますか?ファイルのサイズ、クライアントの帯域幅、サーバーの負荷はすべてタイミングで役割を果たします。 –

+0

私は消費者のために 'ThreadPool'の使用を提案するために私の答えを更新しました。画像処理がCPUバインドの場合は、 'multiprocessing.Pool'を使用してください。 –

関連する問題