2017-10-12 3 views
0

リスナーのためにproducerを作成しようとしています。 私のコードは、このプロデューサーのリスナー

suspend fun foo() = produce{ 
    someEvent.addListener { 
     this.send(it) 
    } 
} 

のように見えます。しかし、私は理にかなってエラーSuspension functions can be called only within coroutineを取得しています。私の質問は。コルーチンを使ってこのパターンを実装する方法はありますか?

答えて

2

それを実装するにはいくつかの方法は、あなたが達成しようとしているものに応じて、あります

あなただけ最新イベントを受信したい場合は、あなたが混同チャネルとofferメソッドを使用する必要がありますその遠かっはそれを成功:

fun foo() = produce<T>(capacity = Channel.CONFLATED) { 
    someEvent.addListener { 
     offer(it) 
    } 
} 

それはすべてのイベントを受信することが重要である場合は、あなたの選択はあなたのイベントプロデューサの動作に依存します。ここで熟考する重要な質問は、イベントプロデューサが多くのイベント「ノンストップ」を作り始める場合どうなるかです。ほとんどの "同期"イベントプロデューサは、経験則として、明示的なバックプレッシャ信号をサポートしていませんが、暗黙のバックプレッシャ信号をサポートしています。リスナが遅いかスレッドをブロックすると、遅くなります。だから、通常は、以下の解決策は、同期イベントプロデューサーのために完璧に動作します:

fun foo() = produce<T>() { 
    someEvent.addListener { 
     runBlocking { send(it) } 
    } 
} 

イベントのバッチを一度に生成されたときにケースを持っている場合も、パフォーマンスの最適化などproduceビルダーへのパラメータとして、いくつかの正capacity = xxxを指定することができますプロデューサーをブロックするのではなく、消費者が自分のペースでそれらを処理できるようにします。

まれに、プロデューサーが暗黙のブロッキングバックプレッシャーシグナル(内部同期なしでイベントを激しく生成する何らかのマルチスレッドコンピング)を理解していない場合、無制限容量のチャネルをofferしかし、プロデューサーは、消費者をoutruns場合は、メモリを実行している危険があることは注意してください:

fun foo() = produce<T>(capacity = Channel.UNLIMITED) { 
    someEvent.addListener { 
     offer(it) 
    } 
} 

あなたのプロデューサーは、(機能的反応性流れのような)明示的な背圧信号をサポートしている場合は、あなたがに特別なアダプタを使用する必要がありますコルーチンとの間で背圧信号を適切に転送します。 kotlinx.coroutinesライブラリには、この目的のためのさまざまなリアクティブライブラリを備えた多数のすぐに使用できる統合モジュールがあります。 hereを参照してください。

注:あなたはsuspend修飾子を使用してfoo機能をマークしてはいけません。 fooの呼び出しは、とにかく呼び出し側を中断しません。プロデューサのコルーチンがただちに(同期的に)開始されます。

コルーチンとさまざまな種類のチャンネルについては、the guide on kotlinx.coroutinesをよくお読みください。

+0

徹底的な答えに感謝します:) –

関連する問題