2017-11-09 2 views
1

this post上の第二の答えを取ると、私はこれは私が望む結果ではない、次のコードpandas dataframeでpool.starmap()を使用するには?

from multiprocessing import Pool 
import numpy as np 
from itertools import repeat 
import pandas as pd 

def doubler(number, r): 
    result = number * 2 + r 
    return result 

def f1(): 
    return np.random.randint(20) 

if __name__ == '__main__': 
    df = pd.DataFrame({"A": [10,20,30,40,50,60], "B": [-1,-2,-3,-4,-5,-6]}) 
    num_chunks = 3 
    # break df into 3 chunks 
    chunks_dict = {i:np.array_split(df, num_chunks)[i] for i in range(num_chunks)} 

    arg1 = f1() 

    with Pool() as pool: 
     results = pool.starmap(doubler, [zip(chunks_dict[i]['B'], repeat(arg1)) for i in range(num_chunks)]) 

    print(results) 

>>> [(-1, 20, -1, 20, -2, 20), (-3, 20, -3, 20, -4, 20), (-5, 20, -5, 20, -6, 20)] 

を試してみました。私が欲しいのは、の各要素をdoubler関数とf1からの出力にフィードすることです。これがstarmaprepeatを使用して2倍の入力といくつかのランダムな整数のリスト出力を得る理由ですそれに。例えば

f1の出力が2だった場合、私は

>>> [0,-2,-4,-6,-8,-10] # [2*(-1) + 2, 2*(-2) + 2, ... ] 

誰もが、私はこの所望の結果を達成する方法を助言することができ返すようにしたいですか?おかげ

EDIT:全データフレームを挿入するといずれかの動作しません:

with Pool() as pool: 
    results = pool.starmap(doubler, [zip(df['B'], repeat(arg1))]) 

>>> TypeError: doubler() takes 2 positional arguments but 6 were given 

基本的に、私はちょうどチャンクに私のデータフレームを分割したい、と(ARG1)これらのチャンクだけでなく、他の変数を与えます複数の引数を受け入れる関数に変換します。

+0

あなたの 'f1()'はその乱数を返すべきでしょうか?私はあなたの問題を解決するとは思わないが、それは奇妙に見える。 – Paul

答えて

1

あなたの議論は正しくはありません。例えば、doublerで引数のprintを追加した後、私は以下を参照してください。(f1()戻り2仮定):tuplesのリストだけとは対照的に、starmapに渡される引数は一緒にzippedているので、これはある

doubler number (-3, 2) r (-4, 2) 
doubler number (-1, 2) r (-2, 2) 
doubler number (-5, 2) r (-6, 2) 

を。

チャンクの手順と引数の生成を書き直すほうがずっと簡単だと思います。私はこれを正しく理解していると仮定して、引数の次のタプルリストを得たいとします(は2を返します)。

[(-1,2)、(-2,2)、(-3、 2)、(4、2)(-5、2)、(-6、2)]

これは次いで、[[0, -2, -4, -6, -8, -10]starmap戻るこの[doubler(-1, 2), doubler(-2, 2),...doubler(-6, 2)]そのdoubler関数に適用されます。これを試してください:

from multiprocessing import Pool 
import numpy as np 
from itertools import repeat 
import pandas as pd 


def doubler(number, r): 
    result = number * 2 + r 
    return result 


def f1(): 
    return np.random.randint(20) 


if __name__ == '__main__': 
    df = pd.DataFrame({"A": [10, 20, 30, 40, 50, 60], "B": [-1, -2, -3, -4, -5, -6]}) 
    num_processes = 3 

    # the "r" value to use with every "B" value 
    random_r = f1() 

    # zip together a list of tuples of each B value and the random r value 
    tuples = [(b, r) for b, r in zip(df.B.values, repeat(random_r, len(df.B.values)))] 
    print(tuples) 

    with Pool(num_processes) as pool: 
     results = pool.starmap(doubler, tuples) 

    print(results) 
+0

ありがとうございます。これはこの単純なケースではうまくいくようです。各処理コアがデータの単一のチャンクでしか動作していないかどうかを確認する方法はありますか?つまり、データフレームの300k行を連続して操作するコードを実行すると30分かかるため、データを4つの塊に分割し、1つの塊(1つの塊)(75k行)で1つのコア作業を行うと、私は、時間を4分の1に、すなわち約7〜8分になると期待しています。しかし、これは起こっていません、シリアルコードと同じ時間がかかります。 – killerT2333

関連する問題