2016-04-05 21 views
7

を収集します。 「tl; dr」コードを貼り付ける代わりに、私はあなたに絵を見せて、それを説明します。簡単に言えば enter image description hereは、私は私の実際のプロジェクトでは、この問題が発生したため、私のテストコードとプロファイラによって証明しましたメカニズム

、私は共有のものを持っていないし、お互いを気にしない、どちらも、2 Future sから結果を得るためにFuture.firstCompletedOfを使用しています。私が対処したい質問であっても、Futureの両方が完了するまで、ガベージコレクタは最初のResultオブジェクトをリサイクルできません

だから私はこの背後にあるメカニズムについては本当に興味があります。誰かがそれをより低いレベルから説明することができますか、または私が調べるためのヒントを提供することができます。

ありがとうございます!

PS:同じものを共有しているのですか?ExecutionContext

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { 
    val p = Promise[T]() 
    val completeFirst: Try[T] => Unit = p tryComplete _ 
    futures foreach { _ onComplete completeFirst } 
    p.future 
} 

{ futures foreach { _ onComplete completeFirst }を行う場合、機能{ _ onComplete completeFirst }がどこか ExecutionContext.executeを経由して保存されます。

**更新**ペーストのテストコード要求として

object Main extends App{ 
    println("Test start") 

    val timeout = 30000 

    trait Result { 
    val id: Int 
    val str = "I'm short" 
    } 
    class BigObject(val id: Int) extends Result{ 
    override val str = "really big str" 
    } 

    def guardian = Future({ 
    Thread.sleep(timeout) 
    new Result { val id = 99999 } 
    }) 

    def worker(i: Int) = Future({ 
    Thread.sleep(100) 
    new BigObject(i) 
    }) 

    for (i <- Range(1, 1000)){ 
    println("round " + i) 
    Thread.sleep(20) 
    Future.firstCompletedOf(Seq(
     guardian, 
     worker(i) 
    )).map(r => println("result" + r.id)) 
    } 

    while (true){ 
    Thread.sleep(2000) 
    } 
} 
+0

は、私はあなたが、私は反対のことを言うので、「結果が」ゴミを収集することができないことを証明するために管理方法についての骨董品だ、それは面白いかもしれません。これをどのように確認したかについての詳細を追加することがありますか? –

+0

コードを表示します。それがなければ何が起こっているのかは言うまでもありません。 –

+0

実際、この問題は一般的な問題であり、特定のユースケースに依存しないため、詳細を問わずに答えることができます。 –

答えて

9

はのは、firstCompletedOfが実装されている方法を見てみましょう。この関数が正確に保存されている場所は無関係ですが、 のどこかに保存しなければならないことがわかっているので、あとで選択してスレッドを利用できるようになるとスレッドプールで実行できます。

この関数は、pで終了するcompleteFirstで終了します。 だから限り1つの未来がまだあるとして(futuresから)完了するのを待って、それはゴミを収集することがないようにpへの参照があります(その時点で可能性がfirstCompletedOfはすでにからpを取り除く、戻ってきたことであっても、スタック)。

最初の未来が完了すると、結果は(p.tryCompleteを呼び出して)約束事に保存されます。 約束はpが結果を保持しているので、少なくともpがreachablleである限り結果は到達可能であり、futuresから少なくとも1回は未完了である限り、pに到達可能であることがわかりました。 これは、すべての先物が完了する前に結果を収集できない理由です。

更新: 問題は次のとおりです。修正できましたか?私はそれができると思う。私たちがやらなければならないことは、pへの参照をスレッドセーフな方法で「ヌルアウト」するための最初の未来が、AtomicReferenceを使って行うことができることを保証することだけです。このような何か:

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { 
    val p = Promise[T]() 
    val pref = new java.util.concurrent.atomic.AtomicReference(p) 
    val completeFirst: Try[T] => Unit = { result: Try[T] => 
    val promise = pref.getAndSet(null) 
    if (promise != null) { 
     promise.tryComplete(result) 
    } 
    } 
    futures foreach { _ onComplete completeFirst } 
    p.future 
} 

私はそれをテストし、それが結果は、すぐに最初の将来が完了するとガベージコレクトすることができない期待通りにしています。他の点でも同じように動作するはずです。

+0

私のためにそれを壊してくれてありがとう、私は 'firstCompletedOf'をかなり待っていて、それを理解できませんでした。そして、結論は直感にはかなりのもので、誰かがそれに不満を抱いていたかどうかはわかりません。 – noru

+0

私はこの状況を修正すべき代替実装を追加しました。それがあなたのために働くかどうか私に教えてください(これは、標準ライブラリへのプルリクエストを保証するかもしれません)。 –

+0

それは私が観察したようにうまく動作します。スレッドはまだ占有されていますが、まったく別の話です。ご協力いただきありがとうございます! – noru

関連する問題