2017-01-11 16 views
0

ファイルを解析し、大きなリストに変換するスクリプトを作成しようとしています。このリストは後で並列処理されるはずです。私はPythonのマルチプロセッシングのいくつかの実装を試みましたが、それらはすべて順番に実行されるようです。Pythonマルチプロセッシング共有リスト

def grouper(n, iterable, padvalue=None): 
    """grouper(3, 'abcdefg', 'x') --> 
    ('a','b','c'), ('d','e','f'), ('g','x','x')""" 
    return izip_longest(*[iter(iterable)]*n, fillvalue=padvalue) 

def createRecords(givenchunk): 
    for i1 in range(len(givenchunk)): 
    <create somedata> 
    records.append(somedata) 

if __name__=='__main__': 
    manager = Manager() 
    parsedcdrs = manager.list([]) 
    records = manager.list([]) 

    <some general processing here which creates a shared list "parsedcdrs". Uses map to create a process "p" in some def which is terminated afterwards.> 

    # Get available cpus 
    cores = multiprocessing.cpu_count() 

    # First implementation with map with map. 
    t = multiprocessing.Pool(cores) 
    print "Map processing with chunks containing 5000" 
    t.map(createRecords, zip(parsedcdr), 5000) 

    # Second implementation with async. 
    t = multiprocessing.Pool(cores) 
    for chunk in grouper(5000, parsedcdr): 
    print "Async processing with chunks containing 5000" 
    t.apply_async(createRecords, args=(chunk,), callback=log_result) 
    t.close() 
    t.join() 

    # Third implementation with Process. 
    jobs = [] 
    for chunk in grouper(5000, parsedcdr): 
    t = multiprocessing.Process(target=createRecords, args=(chunk,)) 
    t.start() 
    jobs.append(t) 
    print "Process processing with chunks containing 5000" 
    for j in jobs: 
    j.join() 
    for j in jobs: 
    j.join() 

誰かが私を正しい方向に向けることができますか?

+0

あなたの最初の実装で見る限り、aproachはほぼOKですが、処理するリストの各要素は別のリスト(または反復可能)ですか?リスト内の要素 – Netwave

+0

"parsedcdr" は、例えば、実際に他のリストである: '[1482232410、[ 'astp3'、u'elem1' 、u'elem2' 、u'elem3' ]]、[1482232576 、['astp3'、u'elem4 '、u'elem5'、u'elem6 ']]]' – driesken

+1

あなたの最初の実装は問題なく動作しているはずですが、どうしてそれらがsecuentiallyで動作していると思いますか?また、「5000」チャンクを削除して、1つ1つを選択するかどうかを確認してください。 – Netwave

答えて

0

上記の例では、マルチプロセッシングが正常に機能しているようです。問題は別のdefに位置し、パフォーマンスが低下しました。

関連する問題