実行グラフを動的に変更する代わりに何ができますか?ここに私の状況があります。記事をDBに取り込むグラフがあります。記事は異なる形式の3つのプラグインから来ます。このように、私はいくつかの流れを持っているSourceを既存のグラフに動的に追加するにはどうすればよいですか?
val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]
// These are being created every time I poll plugins
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]
val merged = Source.combine(
sourceContentProvider.via(converterFlow1),
sourceNews.via(converterFlow2),
sourceCit)(Merge(_))
val res = merged
.buffer(10, OverflowStrategy.backpressure)
.toMat(sinkDB)(Keep.both)
.run()
問題は、私はニュースから一回2時間ごとに、一回24時間毎にコンテンツプロバイダからデータを取得し、それは人間から来ていますので、最後のソースはいつでも来るかもしれないということです。
私はグラフが不変であることを知っていますが、私は周期的にグラフの新しいインスタンスSource
を添付することができますので、取り込みのプロセスをシングルポイントで調整できますか?
更新:私のデータは、私のケースでは3つのソースSource
-sのストリームと言うことができます。しかし、私は外部クラス(いわゆるプラグイン)からSource
のインスタンスを取得するため、変更できません。これらのプラグインは、自分の摂取クラスとは独立して動作します。私は単一の巨大なクラスにそれらを組み合わせることはできませんSource
。
なぜ新しいソースを追加する必要があるのかは明確ではありません。あなたは、コンテンツプロバイダーからのデータ、ニュースなど手動で入力されたデータを持っていると言いました。したがって、あなたは3つの情報源を持っています。だからあなたのコードは私にはうまく見えます。 –
私は定期的に 'Source'クラスの新しいインスタンスを取得する必要がある任意の時間に取得します。したがって、私はコンテンツプロバイダから10Kの記事を摂取しているときの状況を避けたいと思います。その途中で2K項目のニュースから「ソース」を取得します。私は彼らに同時に摂取し、私のシングルスロットリングルールを尊重したい。 – expert
データストリームを 'Source'のシーケンスとしてではなく、すべてのデータを順番に出力する単一の' Source'としてモデル化することをお勧めします。そして、 'Merge'コンビネータで十分でしょう。私はあなたのデザインが、あなたが説明した状況を避ける方法は本当にわかりません。 –