2016-04-29 17 views
10

私はあなたがカスタムForkJoinPoolなしでparallelStreamを使用すると、デフォルトでForkJoinPoolを使用することを知っています。 (そしてまた、その質問の他の回答で)stated hereようなぜ並列ストリームはForkJoinPoolのすべてのスレッドを使用しないのですか?

、より多くの並列性を有するために、あなたがする必要があります。

は、あなた自身のForkJoinPoolに並列ストリームの実行を提出する:yourFJP.submit( () - > stream.parallel()。forEach(doSomething));

だから、私はこれをしなかった:

import java.util.Set; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ForkJoinPool; 
import java.util.stream.IntStream; 
import com.google.common.collect.Sets; 

public class Main { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 

     ForkJoinPool forkJoinPool = new ForkJoinPool(1000); 

     IntStream stream = IntStream.range(0, 999999); 

     final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>()); 

     forkJoinPool.submit(() -> { 
      stream.parallel().forEach(n -> { 

       System.out.println("Processing n: " + n); 
       try { 
        Thread.sleep(500); 
        thNames.add(Thread.currentThread().getName()); 
        System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount()); 
       } catch (Exception e) { 
        throw new RuntimeException(e); 
       } 
      }); 
     }).get(); 
    } 
} 

は、私が作成した、ともプールがあり、アクティブなスレッドの数をログに記録されているどのように多くのスレッドを見るために、スレッド名のセットを作って、両方の数値が16以上にならないので、ここでの並列性は16以上ではありません(なぜ16ですか?)。 forkJoinPoolを使用しないと、私は4つの並列性を得ています。これは、私が持っているプロセッサーの数に従います。

なぜ1000ではなく16であるのですか?

+0

@DavidSchwartzしかし、私が参照した記事をチェックしてください。そこには、これを行うと、親プールのすべてのスレッドが使用されます。 –

+0

ああ、 'thNames'自体はスレッドセーフではなく、多くのスレッドから参照して変更しようとしていることに注意してください。 –

+0

また、私は思っていません - そうは思わないですが、ストリームの構築を '.submit'に渡すラムダに移そうとするかもしれません。 –

答えて

-4

ラムダがラムダをFJPに提出すると、ラムダはFJPではなく一般プールを使用するように思えます。 Sotirios Delimanolisは、上記のコメントでこれを証明しました。あなたが提出しているものはあなたのFJPで実行されるタスクです。

このコードをプロファイリングして、実際に使用されているスレッドを確認してください。

あなたはできません。は、FJP内のスレッドの名前です。

+2

Nono。たぶん私はコメントを間違って言いました。タスクは、それが提出された 'ForkJoinPool'インスタンスのコンテキストで実行する必要があります。これは、現在実行中のスレッドのタイプを調べることで行います。 'ForkJoinWorkerThread'型であれば、それを作成した' ForkJoinPool'にアクセスできます。それを使用して、他の並列タスクを送信することができます。適切なスレッドファクトリを提供することで、カスタム 'ForkJoinPool'のスレッドに名前を付けることができます。 –

9

更新

もともとこの答えはForkJoinPoolは背圧を適用し、ストリームを処理するために利用できるアイドル労働者が常に存在するためにも、規定の並列処理のレベルに達していないと主張精巧な説明でした。

これは間違っています。

実際の答えはこれが重複としてマークされていたために、元の質問で提供される - ストリーム処理のためのカスタムForkJoinPoolを使用することが正式にサポートされていない、とforEachを使用した場合、デフォルト・プールの並列処理がストリームspliteratorを決定するために使用されます動作。このことを指摘してとソティリオスにスチュアート・マークス

for (int i = 0; i < 1_000_000; ++i) { 
    forkJoinPool.submit(() -> { 
     try { 
     Thread.sleep(1); 
     thNames.add(Thread.currentThread().getName()); 
     System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism()); 
     } catch (Exception e) { 
     throw new RuntimeException(e); 
     } 
    }); 
} 

ありがとう:

はここでタスクを手動でカスタムForkJoinPoolに提出されている方法の例だと、プールのアクティブなスレッド数は簡単にその並列処理のレベルに達します私の元の答えが間違っていると主張するためのDelimanolis :)

+0

スチュアートマークスが指摘したリンクがありますか? – Brice

+0

@ブリスク重複した質問のスチュアートの答えを参照してください。 – holocronweaver

+0

@holocronweaver OK、ありがとう! – Brice

関連する問題