私はリアクティブプログラミングとRxJavaを探求しています。楽しいですが、私は答えが見つからない問題に固執しています。私の基本的な質問:無限に稼働しているObservableを終了させるための反応的に適切な方法は何ですか?また、私のコードに関する批評や反応的なベストプラクティスを歓迎します。RxJava - 無限ストリームを終了する
練習として、ログファイルのテールユーティリティを作成しています。ログファイル内の行のストリームはObservable<String>
で表されます。 BufferedReader
にファイルに追加されたテキストの読み込みを続けるには、通常のreader.readLine() == null
終了チェックを無視して、スレッドをスリープさせてより多くのロガーテキストを待つことを意味するように解釈します。
しかし、私はtakeUntil
を使用してオブザーバを終了できますが、無限に実行されているファイルウォッチャを終了するには、きれいな方法を見つける必要があります。私は独自のterminateWatcher
メソッド/フィールドを書くことができますが、これはObservable/Observerカプセル化を壊してしまい、可能な限り反応的なパラダイムとして厳格にしていきたいと思います。ここで
はObservable<String>
コードです:ここでは
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return null; // Not sure what Subscription I should return
}
}
は、彼らが来るように新しい行を出力オブザーバーコードです:
public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.
Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s\t%s\n", file, line);
}
});
}
私の二つの質問は次のとおりです。
- 何無限に実行されているストリームを終了するための反応的に一貫した方法ですか?
- 私のコードの他の間違いはあなたを泣かせますか?サブスクリプションが要求されたときに見ていたファイルを開始しているので、サブスクリプションが終了したときに
Subscription
が閉じているときに:)
スケジューラは、ログファイルの読み取り頻度を定義できます。スケジューラーがFileWatcherを実行すると、FileWatcherは 'readLine()== null'まで行を引き出します。 Observerは通常のように購読を解除するか、 'takeUntil()'を使って自動的に購読を解除することができます。私はそれが機能するかどうかを見るためにこれらの方法を調べなければならないでしょうが、私はその考えが好きです。 – MonkeyWithDarts