2017-05-14 3 views
1

大きなpandasデータフレームを、関数引数として渡されたdaskのワーカーに渡そうとしています。私は(Xは私のデータフレームで)試してみました:パンダのデータフレームを渡して分散作業者を看過するにはどうすればいいですか?

1機能に直接データを渡す:

def test(X): 
    return X 
f=client.submit(test, X) 
f.result() 

2の保存初期化機能でデータフレーム。

def worker_init(r_X): 
    global X 
    X=r_X 
client.run(worker_init,X,y) 

3すべてのノード間でデータフレームを散乱して、私の場合のためのバリアント作業の先物

def test(X): 
    return X 
f_X = client.scatter(X, broadcast=True) 
f = client.submit(test,f_X) 
f.result() 

なしを経由して、それを使って。変種1と2はほとんど同じです。 dask-schedulerはすべてのタスクでメモリを増やし、メモリが不足してタスクが失敗するまで解放しません。

私はパンダのデータフレームを渡すのではなく、ゴミを取得するため、バリアント3は機能しません。

データフレームをワーカーに送信するにはどうしたらいいですか?スケジューラにMemoryErrorがありません。

メモリ効率のよいことになっているが、それでもデータフレームを通過しないバリアント3の完全なコード:ので、あなたがデータフレームを渡す際に、入力のリストについては、

import pandas as pd 
import numpy as np 
from distributed import Client 
client = Client('localhost:8786') 
X = np.random.rand(10000,100) 
X=pd.DataFrame(X) 
f_X = client.scatter(X, broadcast=True) 
def test(X): 
    return X 
f = client.submit(test,f_X) 
f.result()[:10] 

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 

答えて

2

client.scatterチェックを、あなた誤ってそれをシリーズのリストに展開しています。あなたはする必要があります f_X = client.scatter([X], broadcast=True)

今、あなたはすべての労働者に1つのデータフレームがあります。ここでf_Xは1つの未来を含むリストなので、f = client.submit(test,f_X[0])が必要になります。

一般に、クライアントから渡すのではなく、ワーカーの関数内でデータを生成/ロードすることができます。明らかに、すべてをローカルメモリに収め、そのデータをコピーし、シリアル化コストを必要とします。途中で

+0

解決策を調べているうちに、データフレームに列名が重複しているとスキャッタが機能しないことがわかりました。しかし、これは私のコードのバグでした。重複の名前を変更した後、重複して機能しました。 –

+0

'distributed .__ version__ == 1.16.3'の時点で、scatterは適切にシングルトン引数を受け取ります – MRocklin

関連する問題