2013-12-18 16 views
9

私はリアクティブプログラミングとRxJavaを探求しています。楽しいですが、私は答えが見つからない問題に固執しています。私の基本的な質問:無限に稼働しているObservableを終了させるための反応的に適切な方法は何ですか?また、私のコードに関する批評や反応的なベストプラクティスを歓迎します。RxJava - 無限ストリームを終了する

練習として、ログファイルのテールユーティリティを作成しています。ログファイル内の行のストリームはObservable<String>で表されます。 BufferedReaderにファイルに追加されたテキストの読み込みを続けるには、通常のreader.readLine() == null終了チェックを無視して、スレッドをスリープさせてより多くのロガーテキストを待つことを意味するように解釈します。

しかし、私はtakeUntilを使用してオブザーバを終了できますが、無限に実行されているファイルウォッチャを終了するには、きれいな方法を見つける必要があります。私は独自のterminateWatcherメソッド/フィールドを書くことができますが、これはObservable/Observerカプセル化を壊してしまい、可能な限り反応的なパラダイムとして厳格にしていきたいと思います。ここで

Observable<String>コードです:ここでは

public class FileWatcher implements OnSubscribeFunc<String> { 
    private Path path = . . .; 

    @Override 
    // The <? super String> generic is pointless but required by the compiler 
    public Subscription onSubscribe(Observer<? super String> observer) { 
     try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) { 
      String newLine = ""; 
      while (!Thread.interrupted()) { // How do I terminate this reactively? 
       if ((newLine = reader.readLine()) != null) 
        observer.onNext(newLine); 
       else 
        try { 
         // Wait for more text 
         Thread.sleep(250); 
        } catch (InterruptedException e) { 
         Thread.currentThread().interrupt(); 
        } 
      } 
      observer.onCompleted(); 
     } catch (Exception e) { 
      observer.onError(e); 
     } 

     return null; // Not sure what Subscription I should return 
    } 
} 

は、彼らが来るように新しい行を出力オブザーバーコードです:

public static void main(String... args) { 
    . . . 
    Observable<String> lines = Observable.create(createWatcher(file)); 
    lines = lines.takeWhile(new Func1<String, Boolean>() { 
     @Override 
     public Boolean call(String line) { 
      // Predicate for which to continue processing 
      return !line.contains("shutdown"); 
     } 
    }).subscribeOn(Schedulers.threadPoolForIO()) 
      .observeOn(Schedulers.currentThread()); 
    // Seems like I should use subscribeOn() and observeOn(), but they 
    // make my tailer terminate without reading any text. 

    Subscription subscription = lines.subscribe(new Action1<String>() { 
     @Override 
     public void call(String line) { 
      System.out.printf("%20s\t%s\n", file, line); 
     } 
    }); 
} 

私の二つの質問は次のとおりです。

  1. 何無限に実行されているストリームを終了するための反応的に一貫した方法ですか?
  2. 私のコードの他の間違いはあなたを泣かせますか?サブスクリプションが要求されたときに見ていたファイルを開始しているので、サブスクリプションが終了したときにSubscriptionが閉じているときに:)

答えて

6

、それは、すなわち、それを終了するために理にかなっています。これはコードコメントの緩やかな終点の1つに対処します。Subscriptionを返します。これはFileWatcherにファイルの再生を停止するよう指示します。次に、現在のスレッドが中断されているかどうかをチェックするのではなく、サブスクリプションがキャンセルされているかどうかをチェックするようにループ条件を置き換えます。

あなたのonSubscribe()メソッドが返されない場合、問題はそれほど役に立ちません。おそらくあなたのFileWatcherはスケジューラを指定する必要があります。そのスケジューラで読み込みが発生しています。

+0

スケジューラは、ログファイルの読み取り頻度を定義できます。スケジューラーがFileWatcherを実行すると、FileWatcherは 'readLine()== null'まで行を引き出します。 Observerは通常のように購読を解除するか、 'takeUntil()'を使って自動的に購読を解除することができます。私はそれが機能するかどうかを見るためにこれらの方法を調べなければならないでしょうが、私はその考えが好きです。 – MonkeyWithDarts

関連する問題