2017-08-24 2 views
1

私はdask.distributedを使用してさまざまな種類のデータ処理パイプラインを実装しています。通常、元のデータはS3から読み込まれ、処理された(大きな)コレクションはS3のCSVに書き込まれます。ファイル/ CSVに非同期にDaskコレクションを保存する

処理を非同期に実行して進捗状況を監視できますが、コレクションをファイルに保存するすべてのto_xxx()メソッドが同期呼び出しのようです。その欠点の1つは、コールがブロックされる可能性があり、潜在的に非常に長い時間です。第二に、私は後で実行される完全なグラフを簡単に構築できません。

実行する方法はありますか。 to_csv()を非同期で呼び出し、ブロックするのではなく将来のオブジェクトを取得しますか?

PS:私は非同期ストレージを自分で実装できると確信しています。コレクションをdelayed()に変換し、各パーティションを格納することでしかし、それは一般的なケースのように思えます - 私はすでにフレームワークに含まれているようなものを持っていることがうまくいくだろう既存の機能を欠場しない限り。

答えて

1

ほとんどのto_*ファンクションにはcompute=Trueキーワード引数があり、これはcompute=Falseに置き換えることができます。これらの場合、遅延値のシーケンスが返され、非同期に計算できます。

values = df.to_csv('s3://...', compute=False) 
futures = client.compute(values) 
関連する問題