2016-11-12 4 views
2

RxJavaのバックプレッシャは実際のバックプレッシャではなく、いくつかの要素セットを無視するだけです。RxJavaでのREALバックプレッシャの最適実装

しかし、私は要素を緩めることができないと私は何とかエミションを遅くする必要がありますか?

RxJavaは要素のエミションには影響しません。したがって、開発者はそれを単独で実装する必要があります。しかしどうですか?

最も簡単な方法は、エミションをインクリメントして仕上げ時にデクリメントするカウンタを使用することです。そのような

public static void sleep(int ms) { 
    try { 
     Thread.sleep(ms); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

public static void main(String[] args) throws InterruptedException { 

    AtomicInteger counter = new AtomicInteger(); 

    Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1)); 
    Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5)); 

    Observable.create(s -> { 
     while (!s.isUnsubscribed()) { 
      if (counter.get() < 100) { 
       s.onNext(Math.random()); 
       counter.incrementAndGet(); 
      } else { 
       sleep(100); 
      } 
     } 
    }).subscribeOn(sA) 
      .flatMap(r -> 
        Observable.just(r) 
          .subscribeOn(sB) 
          .doOnNext(x -> sleep(1000)) 
          .doOnNext(x -> counter.decrementAndGet()) 
      ) 
      .subscribe(); 
} 

しかし、私はこの方法は非常に貧弱であると思います。もっと良い解決策はありますか?

答えて

-1

あなた自身に留意したように、これは実際にRxJavaとは関係ありません。
あなたは最終的にすべてのイベントを処理しなければならない場合がありますが、自分のペースで、使用キューことをしたい:

ExecutorService emiter = Executors.newSingleThreadExecutor(); 
    ScheduledExecutorService workers = Executors.newScheduledThreadPool(4); 
    BlockingQueue<String> events = new LinkedBlockingQueue<>(); 


    emiter.submit(() -> { 
     System.out.println("I'll send 100 events as fast as I can"); 

     for (int i = 0; i < 100; i++) { 
      try { 
       events.put(UUID.randomUUID().toString()); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    }); 

    workers.scheduleWithFixedDelay(
      () -> { 
       String result = null; 
       try { 
        result = events.take(); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 

       System.out.println(String.format("I don't care, got %s only now", result)); 
      }, 0, 1, TimeUnit.SECONDS 
    ); 
+1

キューが背圧ない:RxJavaは合併症なくて背圧対応のフローを与えるための方法がたくさんあります。バックプレッシャのポイントは、排出量を減速させるために生産者に合図することです。それは常に可能ではないかもしれませんが、大多数の場合、そうです。このため、明示的なバックプレッシャはJDK9の新しいリアクティブストリーム仕様に含まれています。 https://github.com/reactive-streams/reactive-streams-jvm – kaqqao

2

まあ、RxJavaにおける背圧が

RxJavaの背圧本当の背圧ではありません実装は、要求チャネルを介して後続のプロデューサとコンシューマ間のノンブロッキング協調です。消費者はrequest()でいくつかの要素を求め、生産者は多くてもonNextで商品の量を生成/生成/生成し、時にはonNextの間で遅延を生成します。

ただし、いくつかの要素のセットを無視します。

これは、RxJavaにオーバーフローを明示的に伝える場合にのみ発生します。

RxJavaは要素エミションには影響しません。したがって、開発者はそれを単独で実装する必要があります。しかしどうですか? Observable.createを使用して

は、ノンブロッキング背圧を実装する方法の高度な知識を必要とし、事実上、それはライブラリのユーザに推奨されません。

Observable.range(1, 100) 
.map(v -> Math.random()) 
.subscribeOn(sA) 
.flatMap(v -> 
    Observable.just(v).subscribeOn(sB) 
    .doOnNext(x -> sleep(1000)) 
) 
.subscribe(); 

または

Observable.create(SyncOnSubscribe.createStateless(
    o -> o.onNext(Math.random()) 
) 
.subscribeOn(sA) 
... 
+0

私はバックプレッシャーについての話題を読んだ。私は、私の例では要素エミションを遅くする方法を求めています。カーソルでデータベースからレコードを読み込んでいるので、システムプロセスのレコードが遅くなり、プロデューサがそれらを発行するようにする必要があります。 – corvax

+0

これはプルベースであり、ダウンストリームが消費できるほど高速にプルします。 – akarnokd

関連する問題