2017-12-04 1 views
4

daskからベストを得る方法が混乱しています。ダンスパフォーマンス:ワークフロー疑問

問題 、私はいくつかの時系列が含まれているデータフレームを持っている(一人一人がkey独自のを持っている)、私はそれらのすべてのそれぞれに機能my_funを実行する必要があります。パンダでそれを解決する1つの方法は df = list(df.groupby("key"))で、my_fun にマルチプロセッシングを適用します。パフォーマンスは、RAMの巨大な使用にもかかわらず、私のマシンではかなり良いし、グーグルクラウドコンピューティングでひどいです。 DASKで

私の現在のワークフローは次のとおりです。

import dask.dataframe as dd 
from dask.multiprocessing import get 
  1. はS3からデータを読み込みます。 14ファイル - > 14のパーティション
  2. `df.groupby( "キー")を適用(my_fun).to_frame.compute(GET = GET)

私はインデックスを設定していなかったとしてdf.known_divisionsFalse

です。

結果グラフは enter image description here で、ボトルネックかどうかわかりません。

質問:

  1. はそれがncpuの倍数としてdf.npartitionsを持っているか、それは問題ではない、より良いですか?
  2. thisから、インデックスをキーとして設定する方が良いようです。私の推測では、私は

    DF [ "キー2"] =のDF [ "キー"] DF = df.set_index( "KEY2")

ような何かを行うことができるということですが、再度、Iドンこれが最善の方法であるかどうかは分かりません。

答えて

3

Daskの "what is taking time"のような質問の場合は、マルチプロセスではなく"distributed" schedulerを使用することをお勧めします。プロセス/スレッドは任意の数で実行できますが、診断ダッシュボード。

具体的な質問については、パーティション間でうまく分割されていない列をグループ化し、単純な集計以外のものを適用する場合は必然的にシャッフルが必要になります。インデックスを設定すると、このシャッフルが明示的なステップとして実行されます。そうしないと、タスクグラフに暗黙シャッフルが表示されます。これは多対多の操作で、各集計タスクはすべての元のパーティション、つまりボトルネックからの入力が必要です。それを回避することはありません。

パーティション数については、8コアで9パーティションのような準最適条件を持つことができます(8つのタスクを計算し、残りのコアがアイドルである間に最終タスクをブロックする可能性があります)。一般的に、ごく少数のパーティションを使用していない限り、適切なスケジューリングの決定を下すためにdaskに依存することができます。 多くの場合件の場合は大したことではありません。

+0

分散スケジューラを使用する方法が間違っているか 'client =クライアント()'と 'df = client.persist(df) – user32185

+0

これはスケジューラを設定する最も簡単な方法です実験。さまざまなパラメータを明示的に指定したり、スケジューラの現在のPythonセッションを中止したり、複数のマシンに分散させたりすることができます。http://distributed.readthedocs.io/en/latest/setupを参照してください。 html。 – mdurant