2017-08-09 1 views
0

dask.compute(...)はブロッキング呼び出しであると予想されます。しかし、dask.computeをネストし、内側のものが(dask.dataframe.read_parquetのように)I/Oを実行すると、内部のdask.computeはブロックされません。私はのように、それぞれ8つのプロセスと2人の労働者を開始した場合ネストされたdask.computeはブロックしません。

import dask, distributed 

def outer_func(name): 
    files = find_files_for_name(name) 
    df = inner_func(files).compute() 
    # do work with df 
    return result 

def inner_func(files): 
    tasks = [ dask.dataframe.read_parquet(f) for f in files ] 
    tasks = dask.dataframe.concat(tasks) 
    return tasks 

client = distributed.Client(scheduler_file=...) 
results = dask.compute([ dask.delay(outer_func)(name) for name in names ]) 

:ここでは擬似コードの例です

dask-worker --scheduler-file $sched_file --nprocs 8 --nthreads 1 

、その後、私が実行して、最大で2×8同時inner_funcを期待inner_func(ファイル)ので、 .compute()がブロックされているはずです。しかし、私が観察したことは、あるworkerプロセス内で、read_parquetステップを開始するとすぐに、別のinner_func(files).compute()が実行される可能性があるということでした。したがって、最終的に複数のinner_func(ファイル).compute()が実行され、いつかはメモリ不足のエラーが発生する可能性があります。

これが期待どおりの動作ですか?もしそうなら、ワーカープロセスごとに1つのinner_func(files).compute()を実行する方法はありますか?

+0

ここに少し混乱があるようです。 dask.dataframeは遅延オブジェクトを作成しますが、遅延/計算された関数内でこれらを作成/計算するのは正常ではありません。この関数がワーカーに送信されているとします。計算はどこで実行されると思いますか? – mdurant

+0

この例の入れ子は、実世界のデータフローIMHOでは非常に典型的です。このようなネストを避けるため、dask DataFrameのような分散データ構造を扱うことは、実際には実現可能/望ましいことではありません。 dask DataFrame APIはpandasよりも小さく、動作するシリアルコードバージョンを維持することが非常に重要であるためです。 inner_funcは、dask-workerプロセス内で複数のスレッドで実行されているようですが、例えばworkerごとに1つのスレッドしか指定しません。 dask-worker --scheduler-file sched.json --nprocs 3 - -nthreads 1 --local-directory/tmp / – user1527390

答えて

0

これは、マルチプロセススケジューラの場合とは異なります。

分散スケジューラを使用するために、dask.computeに頼るのではなく、distributed.Client APIを介してpaced job submissionを使用して回避策を見つけました。 dask.computeは単純なユースケースでは問題ありませんが、いくつの未処理タスクをスケジュールすることができるかは分かりません。したがって、この場合はシステムをオーバーランします。

ここペーシングとdask.Delayedのタスクのコレクションの実行のための擬似コードです:あなたが仕事を実行するためのDASK分散スケジューラを依頼するとき、それは船の関数のコード、および任意のデータは、

import distributed as distr 

def paced_compute(tasks, batch_size, client): 
    """ 
    Run delayed tasks, maintaining at most batch_size running at any 
    time. After the first batch is submitted, 
    submit a new job only after an existing one is finished, 
    continue until all tasks are computed and finished. 

    tasks: collection of dask.Delayed 
    client: distributed.Client obj 
    """ 
    results, tasks = [], list(tasks) 
    working_futs = client.compute(tasks[:batch_size]) 
    tasks = tasks[batch_size:] 
    ac = distr.as_completed(working_futs) 
    for fut in ac: 
     res = fut.result() 
     results.append(res) 
     if tasks: 
      job = tasks.pop() 
      ac.add(client.compute(job)) 
    return results 
0

可能であれば、異なるプロセスにあるワーカー関数に、おそらく異なるマシン上に存在します。これらのワーカープロセスは、通常のPythonコードとして実行されて、関数を忠実に実行します。重要なことは、実行中の関数がdaskワーカ上にあることを知らないということです。デフォルトでは、グローバルなdask分散クライアントが設定されていないことを確認し、通常このケースでdaskを実行しますデフォルトのスケジューラー(スレッド化されたスケジューラー)のワークロード。

タスク内で完全なdask-compute操作を実行し、それらのタスクを実行している分散スケジューラを使用する必要がある場合は、worker clientを使用する必要があります。しかし、あなたのケースでは、ネストを取り除くために仕事を言い換えることができます(上記の擬似コードのようなものですが、これも計算ではうまくいくかもしれません)。

関連する問題