2017-08-22 1 views
1

私たちは以下を実装する必要があります。キューから固定数の入力を処理するDaskアプリケーションを構成する方法はありますか?

  • のRedisからJSON文書を取得することは
  • 抽出し、JSON文書を解析します:チャンネルから消費メッセージごとに

    1. :メッセージの既知の数を提供しますRedisのチャネルを考えます結果のリストには、単一の結果を生成するために、すべての結果オブジェクト間で

  • 集計オブジェクト

  • 多くの作業者にステップ1と2の両方を配布し、すべての結果をメモリに集めないようにします。また、両方のステップでプログレスバーを表示したいと考えています。

    しかし、私たちは進捗状況を見て、不適切な時間としてブロックせずにシステム内を動くように、アプリケーションを構造化する良い方法はありません。

    たとえば、Redisチャンネルからキューに読み込んだ場合、キューをDaskに渡すことができます。この場合、すべてのメッセージを待たずに各メッセージを処理します。キューを使用すると進捗状況を表示する方法はありません(おそらくキューのサイズが不明であると思われます)

    Redisチャンネルからリストを収集してDaskに渡すと進行状況を見ることができますが、最初の処理を開始する前に、Redisのすべてのメッセージを待たなければなりません。

    このような問題にアプローチするには、推奨される方法はありますか?

    答えて

    1

    Redisチャンネルが並行アクセスで安全な場合は、チャンネルから要素を取り出すために多くの先物を提出することがあります。これらは異なるマシン上で実行されます。

    from dask.distributed import Client, progress 
    client = Client(...) 
    
    futures = [client.submit(pull_from_redis_channel, ..., pure=False) for _ in range(n_items)] 
    futures2 = client.map(process, futures) 
    
    progress(futures2) 
    
    +0

    非常に良い!私はRedis pub/subを使っていましたが、これはうまくいきませんでしたが、LPUSH + BLPOPはポイントポイントのキューのように動作し、上記のアプローチでとてもうまく動作します。ありがとう –

    関連する問題