2016-09-25 10 views
0

これはgroupbyを並列化することでこのquestionが見つかりました。しかし、私が間違っていない限り、それは複数の引数があるケースに1対1に変換することはできません。複数の引数を持つgroupbyを並列化してください

正しい方法は次のとおりですか?より良い方法がありますか? (特にインデックスを取得するのは非常に非効率的だったようだ)。

def applyParallel(dfGrouped, func, *args): 
    with Pool(cpu_count() - 2) as p: 
     ret_list = p.starmap(func, zip([group for name, group in dfGrouped], repeat(*args))) 

    index = [name for name, group in dfGrouped] 
    return pd.Series(index=index, data=ret_list) 

applyParallel(df.groupby(foo), someFunc, someArgs)を使用して呼び出します。

答えて

1

最初の注意点は、データがかなり大きい場合を除き、並列化のメリットがあまりないことです。

マルチプロセッシングプールで直接作業するのではなく、これを行う最も簡単な方法は、daskを試すことです。これはパンダ風のAPIを提供します。

df = pd.DataFrame(np.random.randn(10000000, 10), columns=list('qwertyuiop')) 

df['key'] = np.random.randint(0, 100, size=len(df)) 

import dask.dataframe as dd 

# want a partition size small enough to easily fit into memory 
# but large enough to make the overhead worth it 
ddf = dd.from_pandas(df, npartitions=4) 

%timeit df.groupby('key').sum() 
1 loop, best of 3: 1.05 s per loop 

# calculated in parallel on the 4 partitions 
%timeit ddf.groupby('key').sum().compute() 
1 loop, best of 3: 695 ms per loop 

デフォルトでは、DASKはGILを解放sumのような機能のための高速でデータフレームのためのスレッドベースのスケジューラを、使用することに注意してください。カスタムPython関数(GILが必要)を適用している場合は、複数処理スケジュールでより良いパフォーマンスが得られます。

dask.set_options(get=dask.multiprocessing.get) 
関連する問題