2009-06-28 5 views
1

を掛けます。同時ディレクトリトラバーサルのアルゴリズムは、私は時々、すべての並列計算が終了した後にハングアップしますが、「主」スレッドが他のタスクを続行することはありません同時、再帰的なディレクトリトラバーサルとファイル処理プログラムを、作成した問題

コードは、基本的にフォーク参加スタイル同時アグリゲータであり、平行凝集が完了した後、それは、スイング・ウィンドウに結果を表示すべきです。集約の問題は、ツリーを生成し、階層内のリーフノードの統計を集約する必要があることです。

私は、同時実行のミスを犯したと確信しているが、それを見つけることができません。私はポストの最後に私のコードの関連部分を含めました(コードコメントは簡潔さのために削除されました。申し訳ありませんが150行、必要であれば、私は外部の場所に移動することができます)。

コンテキスト:Javaの6u13はWindows XP SP3、Core 2 DuoプロセッサのCPU。

私の質問は以下のとおりです。

このランダムにハングアップのための原因である可能性がありますか?

おそらくすでに存在するライブラリの形で並行ディレクトリトラバーサルを実行するより良い方法はありますか?

Doug Lea(またはJava 7)のフォーク・ジョイン・フレームワークは、集約/ディレクトリ・トラバーサルのためのより良いフレームワークでしょうか?もしそうなら、概念レベルで私の実装を再考する必要がありますか?

ありがとうございます。

とコードの抜粋:

private static JavaFileEvaluator[] processFiles(File[] files) 
throws InterruptedException { 
    CountUpDown count = new CountUpDown(); 
    ThreadPoolExecutor ex = (ThreadPoolExecutor)Executors 
    .newFixedThreadPool(Runtime.getRuntime().availableProcessors()); 

    JavaFileEvaluator[] jfes = new JavaFileEvaluator[files.length]; 
    for (int i = 0; i < jfes.length; i++) { 
     count.increment(); 
     jfes[i] = new JavaFileEvaluator(files[i], count, ex); 
     ex.execute(jfes[i]); 
    } 
    count.await(); 
    for (int i = 0; i < jfes.length; i++) { 
     count.increment(); 
     final JavaFileEvaluator jfe = jfes[i]; 
     ex.execute(new Runnable() { 
      public void run() { 
       jfe.aggregate(); 
      } 
     }); 

    } 
    // ------------------------------------- 
    // this await sometimes fails to wake up 
    count.await(); // <--------------------- 
    // ------------------------------------- 
    ex.shutdown(); 
    ex.awaitTermination(0, TimeUnit.MILLISECONDS); 
    return jfes; 
} 
public class JavaFileEvaluator implements Runnable { 
    private final File srcFile; 
    private final Counters counters = new Counters(); 
    private final CountUpDown count; 
    private final ExecutorService service; 
    private List<JavaFileEvaluator> children; 
    public JavaFileEvaluator(File srcFile, 
      CountUpDown count, ExecutorService service) { 
     this.srcFile = srcFile; 
     this.count = count; 
     this.service = service; 
    } 
    public void run() { 
     try { 
      if (srcFile.isFile()) { 
       JavaSourceFactory jsf = new JavaSourceFactory(); 
       JavaParser jp = new JavaParser(jsf); 
       try { 
        counters.add(Constants.FILE_SIZE, srcFile.length()); 
        countLines(); 
        jp.parse(srcFile); 
        Iterator<?> it = jsf.getJavaSources(); 
        while (it.hasNext()) { 
         JavaSource js = (JavaSource)it.next(); 
         js.toString(); 
         processSource(js); 
        } 
       // Some catch clauses here 
       } 
      } else 
      if (srcFile.isDirectory()) { 
       processDirectory(srcFile); 
      } 
     } finally { 
      count.decrement(); 
     } 
    } 
    public void processSource(JavaSource js) { 
     // process source, left out for brevity 
    } 
    public void processDirectory(File dir) { 
     File[] files = dir.listFiles(new FileFilter() { 
      @Override 
      public boolean accept(File pathname) { 
       return 
       (pathname.isDirectory() && !pathname.getName().startsWith("CVS") 
       && !pathname.getName().startsWith(".")) 
       || (pathname.isFile() && pathname.getName().endsWith(".java") 
       && pathname.canRead()); 
      } 
     }); 
     if (files != null) { 
      Arrays.sort(files, new Comparator<File>() { 
       @Override 
       public int compare(File o1, File o2) { 
        if (o1.isDirectory() && o2.isFile()) { 
         return -1; 
        } else 
        if (o1.isFile() && o2.isDirectory()) { 
         return 1; 
        } 
        return o1.getName().compareTo(o2.getName()); 
       } 
      }); 
      for (File f : files) { 
       if (f.isFile()) { 
        counters.add(Constants.FILE, 1); 
       } else { 
        counters.add(Constants.DIR, 1); 
       } 
       JavaFileEvaluator ev = new JavaFileEvaluator(f, count, service); 
       if (children == null) { 
        children = new ArrayList<JavaFileEvaluator>(); 
       } 
       children.add(ev); 
       count.increment(); 
       service.execute(ev); 
      } 
     } 
    } 
    public Counters getCounters() { 
     return counters; 
    } 
    public boolean hasChildren() { 
     return children != null && children.size() > 0; 
    } 
    public void aggregate() { 
     // recursively aggregate non-leaf nodes 
     if (!hasChildren()) { 
      count.decrement(); 
      return; 
     } 
     for (final JavaFileEvaluator e : children) { 
      count.increment(); 
      service.execute(new Runnable() { 
       @Override 
       public void run() { 
        e.aggregate(); 
       } 
      }); 
     } 
     count.decrement(); 
    } 
} 
public class CountUpDown { 
    private final Lock lock = new ReentrantLock(); 
    private final Condition cond = lock.newCondition(); 
    private final AtomicInteger count = new AtomicInteger(); 
    public void increment() { 
     count.incrementAndGet(); 
    } 
    public void decrement() { 
     int value = count.decrementAndGet(); 
     if (value == 0) { 
      lock.lock(); 
      try { 
       cond.signalAll(); 
      } finally { 
       lock.unlock(); 
      } 
     } else 
     if (value < 0) { 
      throw new IllegalStateException("Counter < 0 :" + value); 
     } 
    } 
    public void await() throws InterruptedException { 
     lock.lock(); 
     try { 
      if (count.get() > 0) { 
       cond.await(); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 
} 

編集はJavaSourceEvaluatorでhasChildren()メソッドを追加しました。 JavaFileEvaluator、count.decrement(の集計方法で

答えて

1

)は、最終的に、ブロックで呼び出されていません。任意のRuntimeExceptionsが集計関数内にスローされた場合(おそらくhasChildrenメソッドには?という本文はありません)、デクリメントの呼び出しは起こりません。CountUpDownは無期限に待機します。これはあなたが見ているランダムなハングの原因かもしれません。

2番目の質問については、私はこれを行うためのJavaでの任意のライブラリを知りませんが、私は本当に非の答えのため、申し訳ありません見ていないが、これは私があらゆる機会を持っていたものではありません前に使用する。

第3の質問がある限り、誰かが提供するfork-joinフレームワークを使用するか、独自の並行処理フレームワークを提供し続けるかは、トラバース作業を行うロジックを分離すること論理からのディレクトリは、parrallelismを管理することに関与しています。あなたが提供したコードはCountUpDownクラスを使ってすべてのスレッドがいつ終了したかを追跡し、ディレクトリトラバースを扱うメソッド全体にインクリメント/デクリメントを呼び出すことでバグを追跡する悪夢につながります。 java7 fork-joinフレームワークに移動すると、実際のトラバーサルロジックのみを扱うクラスを作成し、並行処理ロジックをフレームワークに委ねることができます。これは良い方法です。もう1つの選択肢は、あなたがここにあるものを使い続けることですが、管理ロジックと作業ロジックを明確にして、これらの種類のバグを追跡して修正するのに役立ちます。

+0

うわー、そうですよ!不足しているhasChildrenを追加しました。また、try-finallyに本体をラップして、しばらくの間ループ内でコードを実行します。ありがとう! +1。 2番目と3番目の質問にも反映できますか? – akarnokd

+0

1年前にこのコードを最初に起動したときにも、ライブラリが見つかりませんでした。このような並列トラバーサルはfjフレームワークに委託されているようです。ありがとうございました。 – akarnokd

関連する問題