2016-11-28 17 views
1

2005年から2010年までの日付ごとに1つのピクルファイルがあります。各ファイルには、その日付のそれぞれの頻度の単語の辞書が含まれています。私はまた、全期間にわたってすべてのユニークな単語を持つ "マスターファイル"を持っています。合計で約500万語があります。複数の入力を持つluigiタスクのアーキテクチャ

私はすべてのデータを取り、単語ごとに1つのCSVファイルを作成する必要があります。これは日付ごとに1つの行を持ちます。例えば、サンプルファイルsome_word.txtのために:

2005-01-01,0.0003 
2005-01-02,0.00034 
2005-01-03,0.008 

私はトラブルルイージフレームワークと、このプロセスを組織を抱えています。私の現在のトップレベルのタスクは単語を取り、日付ごとに関連する頻度を調べ、その結果をCSVファイルに保存します。私はマスターファイル内のすべての単語をループしてその単語でタスクを実行することができますが、それ以上の時間がかからなければ数ヶ月かかると推定しています。私のトップレベルのAggregateTokenFreqsタスクは簡略版です。

class AggregateTokenFreqs(luigi.Task): 
    word = luigi.Parameter() 

    def requires(self): 
     pass # not sure what to require here, master file? 

    def output(self): 
     return luigi.LocalTarget('data/{}.csv'.format(self.word)) 

    def run(self): 
     results = [] 
     for date_ in some_list_of_dates: 
      with open('pickles/{}.p'.format(date_), 'rb') as f: 
       freqs = pickle.load(f) 
       results.append((date_, freqs.get(self.word)) 

     # Write results list to output CSV file 
+1

実行する必要がある処理は何ですか?たとえば、新しい日のデータが到着したときに毎日のプロセスを再実行する計画ですか?あなたが一度だけ実行する必要がある場合は、おそらくluigiを実行する意味がありません。いずれにせよ、マルチプロセッシングを使う方が良いでしょう。 – MattMcKnight

答えて

0

@MattMcKnightは、マルチプロセッシングを使用する方が良いかもしれないと言います。しかし、あなたがルイージを使用したい場合は、以下を実行してください。

  • ルイージはあなたが設定する労働者の概念を持っています。これは、異なるタスクを並行して実行するローカルプロセスの数です。
  • すべてのピクルスを「ループ」する代わりにタスクをモデル化し、1つのピクルスをタスクにパラメータとして渡すことができます。結果は、一意の名前を持つディレクトリのTSVに書き込む必要があります。
  • 各ピクル(日付)ごとにタスクを作成するループを持っています。ワーカー数を設定します(つまり5)。あなたは同時に5つのファイルを処理することができます。
  • 個別のCSVファイルをすべて「結合」する追加のタスクが必要になります。

これが役に立ちます。

関連する問題