2017-11-22 14 views
1

497パンダのデータフレームが.parquetファイルとして格納されたフォルダがあります。フォルダの合計サイズは7.6GBです。Dask.delayedはクラス内の.compute()クラスではありません

私は単純な取引システムを開発しようとしています。したがって、私は2つの異なるクラスを作成します。主なものはPortfolioクラスです。このクラスは、データフォルダ内のすべての単一データフレームに対してAssetオブジェクトを作成します。

import os 
import pandas as pd 
from dask.delayed import delayed 


class Asset(file): 
    def __init__: 
     self.data_path = 'path\\to\\data\\folder\\' 
     self.data = pd.read_parquet(self.data_path + file, engine='auto') 

class Portfolio: 
    def __init__: 
     self.data_path = 'path\\to\\data\\folder\\' 
     self.files_list = [file for file in os.listdir(self.data_path) if file.endswith('.parquet')] 
     self.assets_list = [] 
     self.results = None 
     self.shared_data = '???' 

    def assets_loading(self): 
     for file in self.files_list: 
      tmp = Asset(file) 
      self.assets_list.append(tmp) 

    def dask_delayed(self): 
     for asset in self.assets_list: 
      backtest = delayed(self.model)(asset) 

    def dask_compute(self): 
     self.results = delayed(dask_delayed) 
     self.results.compute() 

    def model(self, asset): 
     # do shet 

if __name__ == '__main__': 
    portfolio = Portfolio() 
    portfolio.dask_compute() 

結果が処理されていないように見えます。私はチェックしようとした場合、コンソール版画portfolio.results:

Out[5]: Delayed('NoneType-7512ffcc-3b10-445f-928a-f01c01bae29c') 

だからここは私の質問は以下のとおりです。

  1. あなたは間違っているものを私に説明できますか?
  2. assets_loading()関数を実行すると、基本的に高速の処理速度のためにデータフォルダ全体がメモリにロードされますが、RAM(16GB)が飽和します。私は7.6GBのフォルダが16GBのRAMを飽和させるとは思わなかったので、Daskを使いたいのです。私のスクリプトワークフローと互換性のあるソリューションはありますか?
  3. もう1つ問題があり、おそらく大きな問題があります。 Daskでは、同時に複数のアセットに渡ってモデル関数を並列化しようとしていますが、各Daskプロセス内にある変数の値をPortfolioオブジェクトに保存するために共有メモリ(スクリプト内のself.shared_data)が必要ですたとえば、単一資産の年間パフォーマンス)。 Dask遅延プロセス間でデータを共有する方法と、このデータをPortfolioの変数に格納する方法を教えてください。

ありがとうたくさん

答えて

0

はラインself.results = delayed(dask_delayed)と間違っていくつかのものがあります。ここでは

  • はあなたが遅れ機能ではなく、遅れた結果を作成しています。あなたはおそらく、あなただけ遅れて、遅延機能のために存在していません.compute()を(呼び出して何も
  • を返さないdask_delayedself.dask_delayed
  • 方法を意味し、
  • dask_delayedが、ここで定義されていない遅れ関数を呼び出す必要があります結果)、出力は保存されません。想定しているように、コンピューティングはインプレースで行われません。

おそらく

self.result = delayed(self.dask_delayed)().compute() 

を望んでいた今、あなたはそれが何かを返すように、dask_delayed()を修正する必要があります。それ自体がすでに遅延しているので、それ以上の遅延関数を呼び出すべきではありません。

最後に、メモリをpd.read_parquetで埋めるために、メモリのバージョンのデータが大きいほど、圧縮/エンコーディングが寄木張りのフォーマットの目的の1つであることは驚きではありません。あなたは怠惰な/オンデマンドのdask.dataframe.read_parquetを試してみることができます。

関連する問題