それを実装するにはいくつかの方法は、あなたが達成しようとしているものに応じて、あります
あなただけ最新イベントを受信したい場合は、あなたが混同チャネルとoffer
メソッドを使用する必要がありますその遠かっはそれを成功:
fun foo() = produce<T>(capacity = Channel.CONFLATED) {
someEvent.addListener {
offer(it)
}
}
それはすべてのイベントを受信することが重要である場合は、あなたの選択はあなたのイベントプロデューサの動作に依存します。ここで熟考する重要な質問は、イベントプロデューサが多くのイベント「ノンストップ」を作り始める場合どうなるかです。ほとんどの "同期"イベントプロデューサは、経験則として、明示的なバックプレッシャ信号をサポートしていませんが、暗黙のバックプレッシャ信号をサポートしています。リスナが遅いかスレッドをブロックすると、遅くなります。だから、通常は、以下の解決策は、同期イベントプロデューサーのために完璧に動作します:
fun foo() = produce<T>() {
someEvent.addListener {
runBlocking { send(it) }
}
}
イベントのバッチを一度に生成されたときにケースを持っている場合も、パフォーマンスの最適化などproduce
ビルダーへのパラメータとして、いくつかの正capacity = xxx
を指定することができますプロデューサーをブロックするのではなく、消費者が自分のペースでそれらを処理できるようにします。
まれに、プロデューサーが暗黙のブロッキングバックプレッシャーシグナル(内部同期なしでイベントを激しく生成する何らかのマルチスレッドコンピング)を理解していない場合、無制限容量のチャネルをoffer
しかし、プロデューサーは、消費者をoutruns場合は、メモリを実行している危険があることは注意してください:
fun foo() = produce<T>(capacity = Channel.UNLIMITED) {
someEvent.addListener {
offer(it)
}
}
あなたのプロデューサーは、(機能的反応性流れのような)明示的な背圧信号をサポートしている場合は、あなたがに特別なアダプタを使用する必要がありますコルーチンとの間で背圧信号を適切に転送します。 kotlinx.coroutines
ライブラリには、この目的のためのさまざまなリアクティブライブラリを備えた多数のすぐに使用できる統合モジュールがあります。 hereを参照してください。
注:あなたはがsuspend
修飾子を使用してfoo
機能をマークしてはいけません。 foo
の呼び出しは、とにかく呼び出し側を中断しません。プロデューサのコルーチンがただちに(同期的に)開始されます。
コルーチンとさまざまな種類のチャンネルについては、the guide on kotlinx.coroutinesをよくお読みください。
徹底的な答えに感謝します:) –