私は、csvで何らかの処理を行うためにスケジューラと4つのワーカーノードをセットアップしました。 csvのサイズはわずか300 MBです。私はこれを行うとき私は、daskデータフレームでのpersistの結果である先物のコレクションを持っています。それらの操作を遅らせる方法は?
df = dd.read_csv('/Downloads/tmpcrnin5ta',assume_missing=True)
df = df.groupby(['col_1','col_2']).agg('mean').reset_index()
df = client.persist(df)
def create_sep_futures(symbol,df):
symbol_df = copy.deepcopy(df[df['symbol' == symbol]])
return symbol_df
lazy_values = [delayed(create_sep_futures)(symbol, df) for symbol in st]
future = client.compute(lazy_values)
result = client.gather(future)
番目のリストは、1000個の要素
が含まれている、私はこのエラーを取得する:
distributed.worker - WARNING - Compute Failed
Function: create_sep_futures
args: ('PHG', symbol col_3 col_2 \
0 A 1.451261e+09 23.512857
1 A 1.451866e+09 23.886857
2 A 1.452470e+09 25.080429
kwargs: {}
Exception: KeyError(False,)
私の仮定は、労働者がその上に完全なデータフレームとクエリを取得する必要があることです。しかし、私はそれがブロックを取得し、それをしようとすると思います。
どのような回避策がありますか?データフレームのチャンクはすでにワーカーのメモリに格納されています。私は各作業者にデータフレームを移動させたくありません。