2017-11-23 1 views
0

私は、csvで何らかの処理を行うためにスケジューラと4つのワーカーノードをセットアップしました。 csvのサイズはわずか300 MBです。私はこれを行うとき私は、daskデータフレームでのpersistの結果である先物のコレクションを持っています。それらの操作を遅らせる方法は?

df = dd.read_csv('/Downloads/tmpcrnin5ta',assume_missing=True) 

df = df.groupby(['col_1','col_2']).agg('mean').reset_index() 
df = client.persist(df) 



def create_sep_futures(symbol,df):  

    symbol_df = copy.deepcopy(df[df['symbol' == symbol]]) 

    return symbol_df 
lazy_values = [delayed(create_sep_futures)(symbol, df) for symbol in st] 

future = client.compute(lazy_values) 
result = client.gather(future) 

番目のリストは、1000個の要素

が含まれている、私はこのエラーを取得する:

distributed.worker - WARNING - Compute Failed 
Function: create_sep_futures 
args:  ('PHG',  symbol col_3 col_2 \ 
0    A   1.451261e+09    23.512857 
1    A   1.451866e+09    23.886857 
2    A   1.452470e+09    25.080429 

kwargs: {} 
Exception: KeyError(False,) 

私の仮定は、労働者がその上に完全なデータフレームとクエリを取得する必要があることです。しかし、私はそれがブロックを取得し、それをしようとすると思います。

どのような回避策がありますか?データフレームのチャンクはすでにワーカーのメモリに格納されています。私は各作業者にデータフレームを移動させたくありません。

答えて

0

データフレームの構文とAPIを使用するデータフレームの操作は、デフォルトで遅延(遅延)しているので、何もする必要はありません。

最初の問題:構文が間違っていますdf[df['symbol' == symbol]] =>df[df['symbol'] == symbol]です。それがFalseキーの由来です。

ですから、おそらく探しているソリューション:あなたがを行う場合

future = client.compute(df[df['symbol'] == symbol]) 

はあなたが正常な機能を使用しての世話をするdf.map_partitions、に見ることができ、別途チャンクに仕事をしたいですデータまたは遅延/先物を渡すかdf.to_delayedとなり、遅れた機能で使用できる一連の遅延オブジェクトが得られます。

関連する問題