2011-10-13 9 views
10

質問は単純ですが、私がそれを探してすぐにポップアップしなかったのは驚きです。Groovy/GParsを使用してCSVファイルの行を最も効率的に処理する方法は?

私は、処理が必要な、潜在的に非常に大きいCSVファイルを持っています。すべての行が処理されるまで、各行をプロセッサに渡す必要があります。 CSVファイルを読み込むために、私は次の行を与えるreadNext()メソッドを提供するOpenCSVを使用します。使用可能な行がもうない場合は、すべてのプロセッサーを終了する必要があります。

私は非常にシンプルなスクリプトを作成し、同期readNext()メソッドを定義しました(次の行の読み込みには時間がかかりません)。次の行を読み込んで処理するスレッドをいくつか作成しました。それは正常に動作しますが、...

私はちょうど使用できる組み込みのソリューションはありませんか?これは、常にメモリ内に既存のコレクションがあると仮定しているため、gparsコレクション処理ではありません。代わりに、私はそれをすべてメモリに読み込み、それを処理する余裕がありません。それは、メモリ不足の例外につながります。

だから、誰かがCSVファイルを「行単位で」処理するための素晴らしいテンプレートをワーカースレッドのカップルを使用して使用していますか?

答えて

6

ファイルに同時にアクセスするのは良い考えではないかもしれません.GParsのフォーク/結合処理は、メモリ内のデータ(コレクション)を対象としています。私の主張は、ファイルを順番にリストに読み込むことです。リストが特定のサイズに達すると、GParsを使用してリスト内のエントリを同時に処理し、リストをクリアしてから、行を読むことで移動します。

2

Grailsでこのように問題の実装をラッピングしています(grails、プレーンなhibernate、プレーンJDBCなどを使用している場合は指定しません)。

あなたが知っていることができるものは何もありません。あなたはSpring Batchとの統合を見ることができましたが、私が最後に見たとき、それは私にとって非常に重く感じられました。

プレーンJDBCを使用している場合は、おそらくクリストフが推奨することを実行するのが最も簡単です(N行を読み込み、GParsを使用してこれらの行を同時に回転させる)。

grailsまたは休止状態を使用していて、ワーカースレッドが依存関係注入のためのスプリングコンテキストにアクセスできるようにするには、少し複雑になります。

私が解決した方法はGrails Redisプラグイン(免責事項:私は作者)とJesque pluginResqueのJava実装です)を使用しています。

Jesqueプラグインを使用すると、Jesqueキューにエンキューされた作業を処理するために使用される任意のパラメータを持つ "process"メソッドを持つ "Job"クラスを作成できます。あなたは、あなたが望むだけ多くの労働者をスピンアップすることができます。

管理者ユーザーがファイルを投稿できるファイルアップロードがあります。ファイルをディスクに保存し、作成したProducerJobのジョブをエンキューします。 ProducerJobがファイルを回転すると、各行ごとにConsumerJobがピックアップするようにメッセージがエンキューされます。このメッセージは、単にCSVファイルから読み取った値のマップです。

ConsumerJobは、これらの値を受け取り、それに対応する適切なドメインオブジェクトを作成し、データベースに保存します。

すでにRedisをプロダクションで使用していたので、これをキューイングのメカニズムとして使用することは意味がありました。私たちには古い同期ロードがあり、ファイルロードを連続して実行しました。私は現在、1人のプロデューサーワーカーと4人の消費者ワーカーを使用しています。この方法でロードすると、以前のロードよりも100倍以上高速です(エンドユーザーへの進捗状況がはるかに改善されています)。

これは比較的一般的なものであるため、このようなパッケージのためのスペースがおそらく存在することに私は同意します。

更新日:a blog post with a simple example doing imports with Redis + Jesqueを付けました。

5

これは俳優には良い問題かもしれません。同期リーダアクタは、CSVラインを並列のプロセッサアクタに渡すことができます。例:

@Grab(group='org.codehaus.gpars', module='gpars', version='0.12') 

import groovyx.gpars.actor.DefaultActor 
import groovyx.gpars.actor.Actor 

class CsvReader extends DefaultActor { 
    void act() { 
     loop { 
      react { 
       reply readCsv() 
      } 
     } 
    } 
} 

class CsvProcessor extends DefaultActor { 
    Actor reader 
    void act() { 
     loop { 
      reader.send(null) 
      react { 
       if (it == null) 
        terminate() 
       else 
        processCsv(it) 
      } 
     } 
    } 
} 

def N_PROCESSORS = 10 
def reader = new CsvReader().start() 
(0..<N_PROCESSORS).collect { new CsvProcessor(reader: reader).start() }*.join() 
+0

この例では、readCsv()呼び出しがCSVの1行を返すとしますか?ちょうど私がこの権利を読んでいることを確認したい。 – Scott

+0

はい、 'readCsv()'は各行を順に読み込みます。ファイルの終わりに達すると、ヌルが返され、プロセッサは終わりに達したことを知り、 'terminate()'しなければなりません。 – ataylor

関連する問題