2.Xから3.Xに移行しようとしています。 https://github.com/reactor/reactor-core/issues/375 EventBusはアプリケーション(低レイテンシーFXシステム)のイベントマネージャーとして使用されており、非常にうまく機能します。プロジェクト用原子炉プロセッサーv3.X
変更後、私たちはすべてのモジュールを取り出し、イベントを処理する独自のプロセッサを作成することに決めました。 1.この使用法はあなたの立場から正しいと思われますか?現在の段階で文書の欠落があり、我々ができることをすべて確認した後で、ここで何をすべきか分からないから 2. X間隔ごとにアクションを実行するためにFluxを使用しようとしました たとえば:Market is arriving for 1000 1秒ですが、アップデートを1秒間に4回しか処理しません。アップグレード後、次のものを使用しています:
バッファを搭載し、別の方法に送信するプロセッサ。 この方法ではFluxがリストを取得し、タスクを完了するために並行して作業しようとします。 私たちが持っていた2つの主要な問題: 1.時には我々は我々のシステムは、私は多分仮定我々は、ハンドラ内のイベント上記の例では、プロセッサ
//Definition of processor
ReplayProcessor<Event> classAEventProcessor = ReplayProcessor.create();
//Event handler subscribing
public void onMyEventX(Consumer<Event> consumer) {
Flux<Event> handler = classAEventProcessor .filter(event -> event.getType().equals(EVENT_X));
handler.subscribe(consumer);
}
を使用してミスしているに送信していることを見つけることができませんNullのイベントを受信しときどき彼がストリームを停止します私たちはサーバーを再作成するまで(再起動時にのみ、プロセッサの作成を行っているため)
2.私たちは並列化しようとしましたが、メッセージの一部が消えてしまった我々はフレームワークを誤用している。
//On constructor
tickProcessor.buffer(1024, Duration.of(250, ChronoUnit.MILLIS)).subscribe(markets ->
handleMarkets(markets));
//Handler
Flux.fromIterable(getListToProcess())
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(entryMap -> {
DoBlockingWork(entryMap);
})
.sequential()
.subscribe();
これは、プロセッサが250msごとにウェイクアップしてハンドラを呼び出すことを意図しています。このハンドラは、より速くより速い処理を行うために、Fluxの並列処理を行います。 EventBusは、米国で包まれ、すべてのイベントが包まれたイベントマネージャを投げる加入しました: は* DoBlockingWorkが250msの以上かかる場合、私は行動
UPDATEがどうなるか理解できませんでした。 これで、すべてのモジュールのイベントプロセッサーを作成しようとしましたが、非常に遅く動作します。私たちはThreadExecutorでTopicProcessorを使いましたが、まだ非常に遅いです。EventBusは高速で同じ作業をしました 誰もが考えていますか?私がDirectProcessorを使用しようとしたとき、TopicProcessorがもっとうまくいくように思えます。
お返事ありがとうございます。私は避けようとしているが、これは何とかそれをパラレルにしなければならない "重いアクション"の例である。 v3に移動した後。イベントバスは使用できません。私はすべてのモジュールの一般的なEventBusの代わりにすべてのモジュールのTopicProcessorに切り替えようとしましたが、それは本当に遅かったです... ThreadExecuterを与えようとしましたが、まだ非常に遅いです。 今私たちのイベントは非常に遅く、並行してうまくいきません。試してみるべきことは何ですか?我々は本当に私たちが考えることができるすべてを試したという手がかりを持っていません。私が見つけたすべての文書を読んでいました – Aviad
私はプロセッサに何か間違っていると思いますか?それらの間にはどのような違いがありますか(DirectPrcessorなど)? 私は実際に何を使用しているのかを説明することができたし、間違いを避けるためのベストプラクティスの助けが必要であり、EventBus – Aviad