2012-05-21 10 views
6

私のプロジェクトでは、並列にタスクを実行するためにmultiprocessingクラスを使用します。代わりにthreadingを使用します。パフォーマンスが向上しています(私のタスクはTCP/IPバインドで、CPUまたはI/Oバインドではありません)。マルチプロセッシングからスレッド化への移行

multiprocessingは、threadingクラスには存在しないPool.imap_unorderedPool.map_asyncという素晴らしい機能を備えています。

threadingを代わりに使用するようにコードを変換する正しい方法はありますか?ドキュメントにはmultiprocessing.dummyクラスが導入されています。これはthreadingクラスのラッパーです。しかし、(少なくとものpython 2.7.3上の)エラーの多くを上げること:

pool = multiprocessing.Pool(processes) 
    File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 150, in Pool 
    return ThreadPool(processes, initializer, initargs) 
    File "C:\python27\lib\multiprocessing\pool.py", line 685, in __init__ 
    Pool.__init__(self, processes, initializer, initargs) 
    File "C:\python27\lib\multiprocessing\pool.py", line 136, in __init__ 
    self._repopulate_pool() 
    File "C:\python27\lib\multiprocessing\pool.py", line 199, in _repopulate_pool 
    w.start() 
    File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 73, in start 
    self._parent._children[self] = None 
AttributeError: '_DummyThread' object has no attribute '_children' 

編集:実際に何が起こることは、私は別のスレッドを実行するGUIを持っているということです(gettintが立ち往生からGUIを防ぐため) 。そのスレッドは、失敗したThreadPoolを持つ特定の検索機能を実行します。

編集2:バグ修正was fixedは今後のリリースに含まれます。 クラッシャーが修正されました。

import urllib2, htmllib, formatter 
import multiprocessing.dummy as multiprocessing 
import xml.dom.minidom 
import os 
import string, random 
from urlparse import parse_qs, urlparse 

from useful_util import retry 
import config 
from logger import log 

class LinksExtractor(htmllib.HTMLParser): 
    def __init__(self, formatter): 
     htmllib.HTMLParser.__init__(self, formatter) 
     self.links = [] 
     self.ignoredSites = config.WebParser_ignoredSites 

    def start_a(self, attrs): 
     for attr in attrs: 
      if attr[0] == "href" and attr[1].endswith(".mp3"): 
       if not filter(lambda x: (x in attr[1]), self.ignoredSites): 
        self.links.append(attr[1]) 

    def get_links(self): 
     return self.links 


def GetLinks(url, returnMetaUrlObj=False): 
    ''' 
    Function gather links from a url. 
    @param url: Url Address. 
    @param returnMetaUrlObj: If true, returns a MetaUrl Object list. 
          Else, returns a string list. Default is False. 

    @return links: Look up. 
    ''' 
    htmlparser = LinksExtractor(formatter.NullFormatter()) 

    try: 
     data = urllib2.urlopen(url) 
    except (urllib2.HTTPError, urllib2.URLError) as e: 
     log.error(e) 
     return [] 
    htmlparser.feed(data.read()) 
    htmlparser.close() 

    links = list(set(htmlparser.get_links())) 

    if returnMetaUrlObj: 
     links = map(MetaUrl, links) 

    return links 

def isAscii(s): 
    "Function checks is the string is ascii." 
    try: 
     s.decode('ascii') 
    except (UnicodeEncodeError, UnicodeDecodeError): 
     return False 
    return True 

@retry(Exception, logger=log) 
def parse(song, source): 
    ''' 
    Function parses the source search page and returns the .mp3 links in it. 
    @param song: Search string. 
    @param source: Search website source. Value can be dilandau, mp3skull, youtube, seekasong. 

    @return links: .mp3 url links. 
    ''' 
    source = source.lower() 
    if source == "dilandau": 
     return parse_dilandau(song) 
    elif source == "mp3skull": 
     return parse_Mp3skull(song) 
    elif source == "SeekASong": 
     return parse_SeekASong(song) 
    elif source == "youtube": 
     return parse_Youtube(song) 

    log.error('no source "%s". (from parse function in WebParser)') 
    return [] 

def parse_dilandau(song, pages=1): 
    "Function connects to Dilandau.eu and returns the .mp3 links in it" 
    if not isAscii(song): # Dilandau doesn't like unicode. 
     log.warning("Song is not ASCII. Skipping on dilandau") 
     return [] 

    links = [] 
    song = urllib2.quote(song.encode("utf8")) 

    for i in range(pages): 
     url = 'http://en.dilandau.eu/download_music/%s-%d.html' % (song.replace('-','').replace(' ','-').replace('--','-').lower(),i+1) 
     log.debug("[Dilandau] Parsing %s... " % url) 
     links.extend(GetLinks(url, returnMetaUrlObj=True)) 
    log.debug("[Dilandau] found %d links" % len(links)) 

    for metaUrl in links: 
     metaUrl.source = "Dilandau" 

    return links 

def parse_Mp3skull(song, pages=1): 
    "Function connects to mp3skull.com and returns the .mp3 links in it" 
    links = [] 
    song = urllib2.quote(song.encode("utf8")) 

    for i in range(pages): 
     # http://mp3skull.com/mp3/how_i_met_your_mother.html 
     url = 'http://mp3skull.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower()) 
     log.debug("[Mp3skull] Parsing %s... " % url) 
     links.extend(GetLinks(url, returnMetaUrlObj=True)) 
    log.debug("[Mp3skull] found %d links" % len(links)) 

    for metaUrl in links: 
     metaUrl.source = "Mp3skull" 

    return links 

def parse_SeekASong(song): 
    "Function connects to seekasong.com and returns the .mp3 links in it" 
    song = urllib2.quote(song.encode("utf8")) 

    url = 'http://www.seekasong.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower()) 
    log.debug("[SeekASong] Parsing %s... " % url) 
    links = GetLinks(url, returnMetaUrlObj=True) 
    for metaUrl in links: 
     metaUrl.source = "SeekASong" 
    log.debug("[SeekASong] found %d links" % len(links)) 

    return links 

def parse_Youtube(song, amount=10): 
    ''' 
    Function searches a song in youtube.com and returns the clips in it using Youtube API. 
    @param song: The search string. 
    @param amount: Amount of clips to obtain. 

    @return links: List of links. 
    ''' 
    "Function connects to youtube.com and returns the .mp3 links in it" 
    song = urllib2.quote(song.encode("utf8")) 
    url = r"http://gdata.youtube.com/feeds/api/videos?q=%s&max-results=%d&v=2" % (song.replace(' ', '+'), amount) 
    urlObj = urllib2.urlopen(url, timeout=4) 
    data = urlObj.read() 
    videos = xml.dom.minidom.parseString(data).getElementsByTagName('feed')[0].getElementsByTagName('entry') 

    links = [] 
    for video in videos: 
     youtube_watchurl = video.getElementsByTagName('link')[0].attributes.item(0).value 
     links.append(get_youtube_hightest_quality_link(youtube_watchurl)) 

    return links 

def get_youtube_hightest_quality_link(youtube_watchurl, priority=config.youtube_quality_priority): 
    ''' 
    Function returns the highest quality link for a specific youtube clip. 
    @param youtube_watchurl: The Youtube Watch Url. 
    @param priority: A list represents the qualities priority. 

    @return MetaUrlObj: MetaUrl Object. 
    ''' 
    video_id = parse_qs(urlparse(youtube_watchurl).query)['v'][0] 
    youtube_embedded_watchurl = "http://www.youtube.com/embed/%s?autoplay=1" % video_id 

    d = get_youtube_dl_links(video_id) 
    for x in priority: 
     if x in d.keys(): 
      return MetaUrl(d[x][0], 'youtube', d['VideoName'], x, youtube_embedded_watchurl) 
    log.error("No Youtube link has been found in get_youtube_hightest_quality_link.") 
    return "" 

@retry(Exception, logger=log) 
def get_youtube_dl_links(video_id): 
    ''' 
    Function gets the download links for a youtube clip. 
    This function parses the get_video_info format of youtube. 

    @param video_id: Youtube Video ID. 
    @return d: A dictonary of qualities as keys and urls as values. 
    ''' 
    d = {} 

    url = r"http://www.youtube.com/get_video_info?video_id=%s&el=vevo" % video_id 

    urlObj = urllib2.urlopen(url, timeout=12) 
    data = urlObj.read() 
    data = urllib2.unquote(urllib2.unquote(urllib2.unquote(data))) 
    data = data.replace(',url', '\nurl') 
    data = data.split('\n') 

    for line in data: 
     if 'timedtext' in line or 'status=fail' in line or '<AdBreaks>' in line: 
      continue 

     try: 
      url = line.split('&quality=')[0].split('url=')[1] 
      quality = line.split('&quality=')[1].split('&')[0] 
     except: 
      continue 
     if quality in d: 
      d[quality].append(url) 
     else: 
      d[quality] = [url] 

    try: 
     videoName = "|".join(data).split('&title=')[1].split('&')[0] 
    except Exception, e: 
     log.error("Could not parse VideoName out of get_video_info (%s)" % str(e)) 
     videoName = "" 

    videoName = unicode(videoName, 'utf-8') 
    d['VideoName'] = videoName.replace('+',' ').replace('--','-') 
    return d 


class NextList(object): 
    "A list with a 'next' method." 
    def __init__(self, l): 
     self.l = l 
     self.next_index = 0 

    def next(self): 
     if self.next_index < len(self.l): 
      value = self.l[self.next_index] 
      self.next_index += 1 
      return value 
     else: 
      return None 

    def isEOF(self): 
     " Checks if the list has reached the end " 
     return (self.next_index >= len(self.l)) 

class MetaUrl(object): 
    "a url strecture data with many metadata" 
    def __init__(self, url, source="", videoName="", quality="", youtube_watchurl=""): 
     self.url = str(url) 
     self.source = source 
     self.videoName = videoName # Youtube Links Only 
     self.quality = quality # Youtube Links Onlys 
     self.youtube_watchurl = youtube_watchurl # Youtube Links Onlys 

    def __repr__(self): 
     return "<MetaUrl '%s' | %s>" % (self.url, self.source) 


def search(song, n, processes=config.search_processes): 
    ''' 
    Function searches song and returns n valid .mp3 links. 
    @param song: Search string. 
    @param n: Number of songs. 
    @param processes: Number of processes to launch in the subprocessing pool. 
    ''' 
    linksFromSources = [] 
    pool = multiprocessing.Pool(processes) 

    args = [(song, source) for source in config.search_sources] 
    imapObj = pool.imap_unordered(_parse_star, args) 
    for i in range(len(args)): 
     linksFromSources.append(NextList(imapObj.next(15))) 
    pool.terminate() 

    links = [] 
    next_source = 0 
    while len(links) < n and not all(map(lambda x: x.isEOF(), linksFromSources)): 
     nextItem = linksFromSources[next_source].next() 
     if nextItem: 
      log.debug("added song %.80s from source ID %d (%s)" % (nextItem.url.split('/')[-1], next_source, nextItem.source)) 
      links.append(nextItem) 

     if len(linksFromSources) == next_source+1: 
      next_source = 0 
     else: 
      next_source += 1 

    return links 

def _parse_star(args): 
    return parse(*args) 
+0

?参照があるといいですね。 –

+0

コードはhttp://pastebin.com/F8QVUtkPです。これは多くのプロジェクトファイルへの参照を持っていますが、主な機能は 'search()'です。この問題は 'pool = multiprocessing.Pool(processes)'コマンドで発生します。元のマルチプロセッシングクラスが使用されているときは発生しません。 – iTayb

+0

@iTayb:あなたは 'プロセス 'としてどのような議論をしていますか? –

答えて

6

私のマシンで問題を再現できません。変数processesには何が入っていますか?それはintですか?

Python 2.7.3 (default, Apr 10 2012, 23:31:26) [MSC v.1500 32 bit (Intel)] on win32 
Type "help", "copyright", "credits" or "license" for more information. 
>>> import multiprocessing.dummy as multiprocessing 
>>> pool = multiprocessing.Pool(5) 
>>> pool 
<multiprocessing.pool.ThreadPool object at 0x00C7DF90> 
>>> 

----編集----

おそらく、また別のフォルダにはPython 2.7.3のクリーンインストールを試してみてください、あなたはあなたの標準ライブラリを台無しにしていた場合はダブルチェックしたいです。

----編集2 ----

あなたはすぐにこのようにそれにパッチを適用することができます:あなたのコードがどのように見えるん何

import multiprocessing.dummy 
import weakref 
import threading 

class Worker(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 

    def run(self): 
     poll = multiprocessing.dummy.Pool(5) 
     print str(poll) 

w = Worker() 
w._children = weakref.WeakKeyDictionary() 
w.start() 
+0

あなたのコードはここで動作します。面白いことに、実際にアップロードしたコードは、直接実行すると機能します。実際には、別のスレッドを実行するGUIを持っている(GUIのgettintがスタックされないようにする)。そのスレッドは失敗した 'ThreadPool'を持つ特定の' search'関数を実行します。 – iTayb

+1

あなたは正しいです、私はPythonがあなたが非プロセスホストからThreadPoolを作成する状況をカバーしていないと思います。解決方法は2を参照してください。あるいは、ハッキリしないように、あなたのGUIスレッドで 'Poll'を作成してワーカースレッドに渡して使うことができます。 – xbtsw

+1

ありがとうございます。また、Python開発チームのバグの問題も出しました:http://bugs.python.org/issue14881。どうもありがとうございました! – iTayb

関連する問題