2016-08-10 10 views
3

私はいくつかの非常に単純なコードを持っており、一連の文字列を読む&フィルタを適用します。私はフィルタが複数のスレッドで動作することを期待していました。ログからRxJavaの並列処理 - フィルタ

Iterable<String> outputs = Observable 
      .from(Files.readLines(new File("E:\\SAMA\\Test\\ImageNetBullets.txt"), Charset.forName("utf-8"))) 
      .take(20).subscribeOn(Schedulers.from(threadPoolExecutor)).filter(str -> isURLOK(str)) 
      .toBlocking().toIterable(); 

、Filterメソッドは、ちょうど1スレッドで実行されているようだ。私はそれをスピードアップするにはどうすればよい

In Thread pool-1-thread-1 
In Thread pool-1-thread-1 
http://farm2.static.flickr.com/1258/1479683334_3ff920d217.jpg 
In Thread pool-1-thread-1 
In Thread pool-1-thread-1 

を?

+1

RxJavaは本質的にシーケンシャルなので、例えばflatMapを使って手動でパラレルを追加する必要があります。 – akarnokd

+0

そのステートメントは意味を成しません - ReactiveXのタイトルにも "非同期"が含まれています。実際、シーケンシャル、シングルスレッド実行と非同期、パラレル - 明らかに 'Observable'の**を選択することは**簡単です** [イントロで見る](https://reactivex.io/ intro.html)、さらに。上記の例は、連続して読み込まれるストリームの意味での反復的な順次実行を示しています.RxJavaは、データセット全体を多くのObservableに分割し、単一の並列オブジェクトに戻すことを可能にします。 – specializt

答えて

5

RxJavaは事実上シーケンシャルです。例えば、map(Func1)を使用して、Func1自体は親配列を通過する値と非並行して実行されます

ここ
Observable.range(1, 10).map(v -> v * v).subscribe(System.out::println); 

、ラムダV - > V * vが10でスルー値1と呼ぶことにしますシーケンシャルな方法。

RxJavaは、パイプラインのステージ(範囲 - >マップ - サブスクライブ)が互いに並行/並行して実行できるように非同期にすることができます。例えば:ここ

Observable.range(1, 10) 
.subscribeOn(Schedulers.computation()) 
.map(v -> v * v)      // (1) 
.observeOn(Schedulers.io()) 
.map(v -> -v)       // (2) 
.toBlocking() 
.subscribe(System.out::println);  // (3) 

、(1)(2)と並行して実行することができる、(3)、すなわち、(2)AV = * 3 3を算出しながら、(1)既にV = 5を計算することができます(3)同時に-1を印刷しています。あなたが同時にシーケンスの要素を処理したい場合には

、あなたは「フォークアウト」サブObservable Sへのシーケンスは、その後、flatMapで結果をバックに参加する必要があります。ここでは

Observable.range(1, 10) 
.flatMap(v -> 
    Observable.just(v) 
    .subscribeOn(Schedulers.computation()) 
    .map(v -> v * v) 
) 
.toBlocking() 
.subscribe(System.out::println); 

、各値 vは、バックグラウンドスレッドで実行され、map()経由で計算される新しい Observableを開始します。 v = 1はスレッド1上で実行されてもよく、 v = 2はスレッド2上で実行されてもよく、 v = 3はスレッド1上で実行されてもよいが、 v = 1が計算された後であってもよい。

+0

美しい説明! – user2849678

2

.subscribeOnを呼び出すと、オブザーバブルが開始されることが決まります(そして、すべての排出量はスケジューラが提供する1つのスレッド上を移動します)。

ストリーム内の各項目を処理する必要があまりない場合は、処理がIOによって支配される可能性があり、並列処理が役に立たない可能性があります。

Observable<String> outputs = 
    lines 
    .buffer(1000) 
    .flatMap(list -> 
     Observable 
      .from(list) 
      //do something computationally expensive 
      .filter(line -> intensive(line)) 
      .subscribeOn(Schedulers.computation())); 

少ないオーバーヘッドがだからbufferワークのまともなチャンクをスケジュールするために使用されている:一つのアプローチは、Schedulers.computation()に加入しているflatMap内チャンク及びプロセスに各チャンクのストリームをバッファリングすることであるが一般的に言えば

たくさんの小さなタスクをスケジュールするよりも。