2016-11-27 3 views
2

を返す1つのメソッドを持つのインターフェイスであるList<Task>があります。 List<Task>を並行して実行し、それぞれTaskを組み合わせた新しいHashMapを返すにはどうすればよいですか?RxJavaを使用して並列タスクのリストの結果を1つのHashMapに結合します。

私は現在、この持っている:

List<Task> tasks = getTasks(); 

Observable.from(tasks) 
    .flatMap(new Func1<Task, Observable<Map<String, JsonElement>>>() { 
     @Override 
     public Observable<Map<String, JsonElement>> call(Task task) { 
      return Observable.just(task.get()); 
     } 
    }); 

// group into single Map<String,JsonElement> 
// create Observable<Map<String,JsonElement>> with all results 
+0

新しい 'HashMap'にあなたの' Task'を結合/グループ化したい値によっては? –

+0

各Task.get()の結果は '' 'map.putAll()' ''を使って新しい '' 'HashMap'''で結合されるべきです –

答えて

3

使用defer契約当たりMap、あなたが好きなサイズのスレッドプールに基づいてSchedulerをカプセル化する:

Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(5)); 
Observable.defer(() -> { 
    final Map<String, JsonElement> map = new ConcurrentHashMap<>(); 
    return Observable 
    .from(tasks) 
    .flatMap(task -> 
     Observable 
     .fromCallable(task -> task.get()) 
     .doOnNext(mp -> map.putAll(mp)) 
     .subscribeOn(scheduler)) 
    .ignoreElements() 
    .concatWith(Observable.just(map)); 
}); 

注の選択肢ことをSchedulerは実行されているタスクの性質によって異なります。 CPUが支配している場合はSchedulers.computation()に満足しているかもしれません。

関連する問題