2012-01-05 10 views
2

Javaでプログラムを作成する必要があります。これは、ディレクトリツリー内の比較的大きな数(〜50,000)のファイルを読み取り、データを処理し、処理されたデータを別の(フラット)ディレクトリ。複数のファイルを並列に読み書きする

現在、私はこのようなものがあります:

private void crawlDirectoyAndProcessFiles(File directory) { 
    for (File file : directory.listFiles()) { 
    if (file.isDirectory()) { 
     crawlDirectoyAndProcessFiles(file); 
    } else { 
     Data d = readFile(file); 
     ProcessedData p = d.process(); 
     writeFile(p,file.getAbsolutePath(),outputDir); 
    } 
    } 
} 

を、彼らはすべての作業の罰金を、それらの各メソッドを除去し、読みやすくするためにダウンにトリミングされていることを言えば十分、しかし。それは遅いことを除いて、全体のプロセスは正常に動作します。データの処理はリモートサービスを介して行われ、5〜15秒かかります。それを50,000倍してください...

私は以前に何もマルチスレッドを行ったことはありませんでしたが、私は私がすれば私はかなり良いスピードの増加を得ることができます。どのように私は効果的にこのメソッドを並列化することができますいくつかのポインタを誰も与えることができますか?

+0

どのくらいのファイルが処理されていますか?私は、ディスクからファイルを読み込むのにもっと時間がかかると、実際にスレッド化することで実際には得られないからです。 – SimonC

+1

ディスクバインドの可能性があるタスクを並列化することによるスピードアップはありません。別の物理ドライブにあるディレクトリを並列化しようとしている場合を除き... – Mysticial

+0

出力を1つのファイルにするのか、ファイルごとに行うのですか? – MahdeTo

答えて

5

私は希望スレッドを管理するにはThreadPoolExecutorを使用してください。あなたはこのような何かを行うことができます。

private class Processor implements Runnable { 
    private final File file; 

    public Processor(File file) { 
     this.file = file; 
    } 

    @Override 
    public void run() { 
     Data d = readFile(file); 
     ProcessedData p = d.process(); 
     writeFile(p,file.getAbsolutePath(),outputDir); 
    } 
} 

private void crawlDirectoryAndProcessFiles(File directory, Executor executor) { 
    for (File file : directory.listFiles()) { 
     if (file.isDirectory()) { 
      crawlDirectoryAndProcessFiles(file,executor); 
     } else { 
      executor.execute(new Processor(file); 
     } 
    } 
} 

あなたが使用して執行を得るでしょう:

poolSizeは一度行くしたいスレッドの最大数がある
ExecutorService executor = Executors.newFixedThreadPool(poolSize); 

。 (ここで合理的な数字をとることが重要です; 50,000のスレッドは良いアイデアではありません)合理的な数値は8かもしれません。すべてのファイルをキューに入れた後、メインスレッドは、 executor.awaitTermination

+0

元のメソッドの振る舞いに合わせて、すべての処理が完了したことを確認するために、最後にも「結合」を考慮してください。 –

+1

@AdrianShum - 良い点。最近、 'ExecutorService.awaitTermination()' –

+1

を使用することをお勧めします。この例は、 'new ForkJoinPool(numprocs)'で使用可能なフォークジョインプールで動作します。集約プロセスは実際にはこれらのプールで最も効果的ですが、フィボナッチシーケンスのような小さなプロセスはシングルスレッドまたはスレッドエグゼキュータ(最適化されたカスタムコードの方が優れています)で最適です。 –

1

もっとも簡単な方法は、スレッドプールを用意することです(対応するExecutorを見てください)。メインスレッドはディレクトリ内をクロールする役割を担います。ファイルに遭遇したら、「ジョブ」(Runnable/Callable)を作成し、Executorにジョブを処理させます。

(これあなたは、呼び出し可能など一部執行を読んだ後、あなたが把握することは難しいことではありませんだって、私はあまり具体的なコードを与えていない好む開始するために十分なものでなければならない)

5

単一のハードディスク(SSDまたはRAIDアレイ、ネットワークファイルシステムなどではなく、単一の同時読み取り操作のみを許可するもの)があると仮定すると、IOを実行する1つのスレッド/ディスクへの書き込み)。また、コアを持っているものと同じくらい多くのスレッドがCPUバインド操作をしたいだけです。そうしないと、コンテキスト切り替えで時間が浪費されます。

上記の制限が適用される場合、以下のコードが役立ちます。シングルスレッドエグゼキュータは、いつでも1つのRunnableが実行されることを保証します。固定スレッドプールにより、一度に実行されるのはNUM_CPUSRunnableです。

これがしないことの1つは、処理が完了したときのフィードバックを提供することです。

private final static int NUM_CPUS = 4; 

private final Executor _fileReaderWriter = Executors.newSingleThreadExecutor(); 
private final Executor _fileProcessor = Executors.newFixedThreadPool(NUM_CPUS); 

private final class Data {} 
private final class ProcessedData {} 

private final class FileReader implements Runnable 
{ 
    private final File _file; 
    FileReader(final File file) { _file = file; } 
    @Override public void run() 
    { 
    final Data data = readFile(_file); 
    _fileProcessor.execute(new FileProcessor(_file, data)); 
    } 

    private Data readFile(File file) { /* ... */ return null; }  
} 

private final class FileProcessor implements Runnable 
{ 
    private final File _file; 
    private final Data _data; 
    FileProcessor(final File file, final Data data) { _file = file; _data = data; } 
    @Override public void run() 
    { 
    final ProcessedData processedData = processData(_data); 
    _fileReaderWriter.execute(new FileWriter(_file, processedData)); 
    } 

    private ProcessedData processData(final Data data) { /* ... */ return null; } 
} 

private final class FileWriter implements Runnable 
{ 
    private final File _file; 
    private final ProcessedData _data; 
    FileWriter(final File file, final ProcessedData data) { _file = file; _data = data; } 
    @Override public void run() 
    { 
    writeFile(_file, _data); 
    } 

    private Data writeFile(final File file, final ProcessedData data) { /* ... */ return null; } 
} 

public void process(final File file) 
{ 
    if (file.isDirectory()) 
    { 
    for (final File subFile : file.listFiles()) 
     process(subFile); 
    } 
    else 
    { 
    _fileReaderWriter.execute(new FileReader(file)); 
    } 
} 
+0

'_fileReaderWriter.execute(new FileWriter(_file、processedData));'を呼び出すと、これは非同期呼び出しですか? –

+0

はい、新しいタスクを '_fileReaderWriter'のキューに追加して、そのスレッドで実行されます。 – SimonC

関連する問題