2017-12-08 4 views
3

私はJava 8 Streamsで作業しています。私はオブジェクトを生成するカスタム関数foo()を持っていて、それが作成するオブジェクトをパラレルストリームにしたいと思います。 foo()はスレッドセーフではありません。Stream.generate(foo).parallel()の場合、fooはスレッドセーフでなければなりませんか?

Stream.generate(foo).parallel()と書くと、foo()は非同期で呼び出されますか?私。オブジェクトが連続して生成され、並列スレッドに渡されるか、複数のスレッドがそれぞれを呼び出して必要に応じてオブジェクトを生成しますか?

答えて

3

データレースは保証行動ではありませんが、次のコード

System.out.println(
    Stream.generate(new Supplier<Integer>() { 
     int i; @Override public Integer get() { return i++; } 
    }).parallel() 
     .limit(10_000) 
     .collect(BitSet::new, BitSet::set, BitSet::or) 
     .cardinality() 
); 

は、再現性の番号を印刷しますサプライヤがスレッドセーフではない場合に、更新が不足している可能性があることを実証しています。

結果の評価に必要以上に多くの要素がサプライヤに問い合わせされる可能性もあります。例えば。

LongAdder adder = new LongAdder(); 
System.out.println(
    Stream.generate(new Supplier<Integer>() { 
     int i; @Override public Integer get() { adder.increment(); return i++; } 
    }).parallel() 
     .limit(10_000) 
     .collect(BitSet::new, BitSet::set, BitSet::or) 
     .cardinality() 
); 
System.out.println("queried "+adder+" times"); 

は、通常、10,000を超える数のクエリをレポートしますが、同時に、データ競合のために10,000個未満の異なる要素がレポートされます。

サプライヤのスレッドを安全にすると、結果は10,000個の異なる要素の正しい数に変更されますが、サプライヤは依然として10,000回以上クエリされる可能性があります。その結果、0〜9,999 generateで作成されたストリームはであり、の順不同であるため、サプライヤーからの10,000の異なる番号を使用できます。

+0

私は2つの例のどちらにも言及していません。さて、最初の並べ替えは意味があります。共有変数は複数のスレッドから更新されます。つまり、サプライヤはスレッドセーフでなければなりませんが、2番目は 'limit' perseです。 – Eugene

+1

@Eugene:サプライヤがスレッドセーフであっても、消費された要素に対してのみクエリが実行されることを想定してはいけません。これはすべての短絡動作(および無限ストリームには短絡動作が必要です)に適用されます。 'limit'は単なる明らかな例です。そして、 'generate'によって返されたストリームが*順序付けられていない*であることを見落とすのは簡単です。そのメソッドを見ると誰かの心に入ってくる多くのユースケースが、それについてもう一度考えるときに実際には不適切なものになるかもしれません... – Holger

+0

@Holger I上記の説明を理解してください、それは微妙で重要なポイントです – MyStackRunnethOver

5

サプライヤーは、あなたが迅速な実験で観察できるように、複数のスレッドから呼び出されます。

Stream.generate(() -> Thread.currentThread().getId()) 
    .parallel() 
    .limit(100000) 
    .distinct() 
    .forEach(System.out::println); 
+1

複数のスレッドから呼び出されても、必ずしも同時に呼び出されるわけではありません。と比較する。 'forEachOrdered'アクションは異なるスレッドから呼び出すことができますが、同時に実行することはできません。したがって、このテストでは、サプライヤがスレッドセーフでなければならないかどうかを判断するには十分ではありません。 (ただし、この場合、サプライヤーは実際に並行して評価され、スレッドセーフでなければなりません)。 – Holger

関連する問題