2017-01-23 7 views
1

2.Xから3.Xに移行しようとしています。 https://github.com/reactor/reactor-core/issues/375 EventBusはアプリケーション(低レイテンシーFXシステム)のイベントマネージャーとして使用されており、非常にうまく機能します。プロジェクト用原子炉プロセッサーv3.X

変更後、私たちはすべてのモジュールを取り出し、イベントを処理する独自のプロセッサを作成することに決めました。 1.この使用法はあなたの立場から正しいと思われますか?現在の段階で文書の欠落があり、我々ができることをすべて確認した後で、ここで何をすべきか分からないから 2. X間隔ごとにアクションを実行するためにFluxを使用しようとしました たとえば:Market is arriving for 1000 1秒ですが、アップデートを1秒間に4回しか処理しません。アップグレード後、次のものを使用しています:

バッファを搭載し、別の方法に送信するプロセッサ。 この方法ではFluxがリストを取得し、タスクを完了するために並行して作業しようとします。 私たちが持っていた2つの主要な問題: 1.時には我々は我々のシステムは、私は多分仮定我々は、ハンドラ内のイベント上記の例では、プロセッサ

//Definition of processor 
    ReplayProcessor<Event> classAEventProcessor = ReplayProcessor.create(); 

    //Event handler subscribing 
    public void onMyEventX(Consumer<Event> consumer) { 
     Flux<Event> handler = classAEventProcessor .filter(event -> event.getType().equals(EVENT_X)); 
     handler.subscribe(consumer); 
    } 

を使用してミスしているに送信していることを見つけることができませんNullのイベントを受信しときどき彼がストリームを停止します私たちはサーバーを再作成するまで(再起動時にのみ、プロセッサの作成を行っているため)

2.私たちは並列化しようとしましたが、メッセージの一部が消えてしまった我々はフレームワークを誤用している。

//On constructor 
    tickProcessor.buffer(1024, Duration.of(250, ChronoUnit.MILLIS)).subscribe(markets -> 
    handleMarkets(markets)); 

    //Handler 
    Flux.fromIterable(getListToProcess()) 
     .parallel() 
    .runOn(Schedulers.parallel()) 
    .doOnNext(entryMap -> { 
     DoBlockingWork(entryMap); 
    }) 
    .sequential() 
    .subscribe(); 

これは、プロセッサが250msごとにウェイクアップしてハンドラを呼び出すことを意図しています。このハンドラは、より速くより速い処理を行うために、Fluxの並列処理を行います。 EventBusは、米国で包まれ、すべてのイベントが包まれたイベントマネージャを投げる加入しました: は* DoBlockingWorkが250msの以上かかる場合、私は行動

UPDATEがどうなるか理解できませんでした。 これで、すべてのモジュールのイベントプロセッサーを作成しようとしましたが、非常に遅く動作します。私たちはThreadExecutorでTopicProcessorを使いましたが、まだ非常に遅いです。EventBusは高速で同じ作業をしました 誰もが考えていますか?私がDirectProcessorを使用しようとしたとき、TopicProcessorがもっとうまくいくように思えます。

答えて

0

リアクター3は可能な限りブロックすることを避けるべきであるというコンセプトを基に構築されています。

イベントはどのように生成されますか?それらを取得するためにリスナーベースの非同期APIを使用していますか?もしそうなら、Flux.createを試してみてください。

"1秒で1000個のイベントがありますが、4秒間だけ処理したい"というユースケースについては、sample演算子をチェーンします。例えば、sample(Duration.ofMillis(250))は、各秒を4つのウィンドウに分割し、そこから最後の要素のみを出力します。

外部の記事や学習資料へのリンクを見つけることができるリファレンスガイドが書かれています.WIPリファレンスガイドhereと学習リソースページhereのプレビューがあります。

+1

お返事ありがとうございます。私は避けようとしているが、これは何とかそれをパラレルにしなければならない "重いアクション"の例である。 v3に移動した後。イベントバスは使用できません。私はすべてのモジュールの一般的なEventBusの代わりにすべてのモジュールのTopicProcessorに切り替えようとしましたが、それは本当に遅かったです... ThreadExecuterを与えようとしましたが、まだ非常に遅いです。 今私たちのイベントは非常に遅く、並行してうまくいきません。試してみるべきことは何ですか?我々は本当に私たちが考えることができるすべてを試したという手がかりを持っていません。私が見つけたすべての文書を読んでいました – Aviad

+1

私はプロセッサに何か間違っていると思いますか?それらの間にはどのような違いがありますか(DirectPrcessorなど)? 私は実際に何を使用しているのかを説明することができたし、間違いを避けるためのベストプラクティスの助けが必要であり、EventBus – Aviad

関連する問題