大きなpandasデータフレームを、関数引数として渡されたdaskのワーカーに渡そうとしています。私は(Xは私のデータフレームで)試してみました:パンダのデータフレームを渡して分散作業者を看過するにはどうすればいいですか?
1機能に直接データを渡す:
def test(X):
return X
f=client.submit(test, X)
f.result()
2の保存初期化機能でデータフレーム。
def worker_init(r_X):
global X
X=r_X
client.run(worker_init,X,y)
3すべてのノード間でデータフレームを散乱して、私の場合のためのバリアント作業の先物
def test(X):
return X
f_X = client.scatter(X, broadcast=True)
f = client.submit(test,f_X)
f.result()
なしを経由して、それを使って。変種1と2はほとんど同じです。 dask-schedulerはすべてのタスクでメモリを増やし、メモリが不足してタスクが失敗するまで解放しません。
私はパンダのデータフレームを渡すのではなく、ゴミを取得するため、バリアント3は機能しません。
データフレームをワーカーに送信するにはどうしたらいいですか?スケジューラにMemoryErrorがありません。
メモリ効率のよいことになっているが、それでもデータフレームを通過しないバリアント3の完全なコード:ので、あなたがデータフレームを渡す際に、入力のリストについては、
import pandas as pd
import numpy as np
from distributed import Client
client = Client('localhost:8786')
X = np.random.rand(10000,100)
X=pd.DataFrame(X)
f_X = client.scatter(X, broadcast=True)
def test(X):
return X
f = client.submit(test,f_X)
f.result()[:10]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
解決策を調べているうちに、データフレームに列名が重複しているとスキャッタが機能しないことがわかりました。しかし、これは私のコードのバグでした。重複の名前を変更した後、重複して機能しました。 –
'distributed .__ version__ == 1.16.3'の時点で、scatterは適切にシングルトン引数を受け取ります – MRocklin