2017-12-24 4 views
1

2つ以上のラインアイテムからなる複数の「レコード」を持つ大きなパンダのデータフレームがあります。私は、マルチプロセッシングを使って各レコードに対してCPU集約的な計算を効率的に実行しようとしています。ここでは単に各レコードに乱数を追加する機能を備えた簡単な例です:パンダのデータフレームでの関数のマルチプロセッシング

import pandas as pd 
from random import randrange 
from multiprocessing import Pool 

#Trivial example function 
def my_func(record): 
    df.loc[((df.Record == record), 'Result')] = randrange(0,100) 
    print (df) 

d = {'Record': ['A', 'A', 'B', 'B'], 'Values': [100, 200, 50, 70]} 
df = pd.DataFrame(d) 
all_records = df['Record'].unique() 

if __name__ == '__main__': 
    pool = Pool(processes=2) 
    pool.map(my_func,all_records) 
    df.to_csv('output.csv') 

所望の出力は、各レコード用の乱数を含む「結果」​​と題した新しいカラムと、元データフレームです。たとえば:

Record Values Result 
0  A  100 63.0 
1  A  200 63.0 
2  B  50 22.0 
3  B  70 22.0 

実際の結果は私のCSV出力は結果列で更新されていません。関数内のprintステートメントを通してプロセスが動作していることがわかります。私が研究したことから、プロセスはdfのコピーで動作し、一緒に戻されません。どのようにして、各プロセスの結果を単一のデータフレームに反映させることができますか?

答えて

0

これはあなたのために働くかもしれない:

import pandas as pd 
from random import randrange 
from multiprocessing import Pool 

#Trivial example function 
def my_func(record): 
    sub_df = df.loc[df['Record'] == record] 
    sub_df['Result'] = randrange(0,100) 
    # return results for the record as pd.Series 
    return sub_df['Result'] 

d = {'Record': ['A', 'A', 'B', 'B'], 'Values': [100, 200, 50, 70]} 
df = pd.DataFrame(d) 
all_records = df['Record'].unique() 

if __name__ == '__main__': 
    pool = Pool(processes=2) 
    results = pool.map(my_func, all_records) 
    pool.close() 
    pool.join() 

    # concatenate results into a single pd.Series 
    results = pd.concat(results) 

    # join results with original df 
    joined_df = df.join(results) 

    print(joined_df) 
    #  Record Values Result 
    # 0  A  100  90 
    # 1  A  200  90 
    # 2  B  50  62 
    # 3  B  70  62 
関連する問題