2017-07-31 1 views
5

私は、複数のコルーチンから消費し、resultChannelにプッシュバックするアイテムを作成しています。プロデューサーは最後の項目の後にチャンネルを閉じています。ファンアウト/ファンインクローズ結果チャンネル

resultChannelが決して閉じられないので、コードは決して終了しません。どのように反復を正しく検出して終了するかhasNext() return false

val inputData = (0..99).map { "Input$it" } 
val threads = 10 

val bundleProducer = produce<String>(CommonPool, threads) { 
    inputData.forEach { item -> 
     send(item) 
     println("Producing: $item") 
    } 

    println("Producing finished") 
    close() 
} 

val resultChannel = Channel<String>(threads) 

repeat(threads) { 
    launch(CommonPool) { 
     bundleProducer.consumeEach { 
      println("CONSUMING $it") 
      resultChannel.send("Result ($it)") 
     } 
    } 
} 

val iterator = object : Iterator<String> { 
    val iterator = resultChannel.iterator() 
    override fun hasNext() = runBlocking { iterator.hasNext() } 
    override fun next() = runBlocking { iterator.next() } 
}.asSequence() 

println("Starting interation...") 

val result = iterator.toList() 

println("finish: ${result.size}") 
+0

私が見つけたそれを行うためのhackish方法は、結果のシーケンスに(100).takeすることですが、私はそれが下の構造を残してどのような状態ではわからない。 – atok

答えて

3

あなたは、消費者が終了するのを待って、その後resultChannelを閉じコルーチンを実行することができます。

まず、Jobの節約するために、消費者を開始し、コードの書き換え:

val jobs = (1..threads).map { 
    launch(CommonPool) { 
     bundleProducer.consumeEach { 
      println("CONSUMING $it") 
      resultChannel.send("Result ($it)") 
     } 
    } 
} 

をそして、すべてのJob Sが終わったら、チャンネルを閉じ、別のコルーチンを実行します。

launch(CommonPool) { 
    jobs.forEach { it.join() } 
    resultChannel.close() 
}