2016-06-21 6 views
4

実行グラフを動的に変更する代わりに何ができますか?ここに私の状況があります。記事をDBに取り込むグラフがあります。記事は異なる形式の3つのプラグインから来ます。このように、私はいくつかの流れを持っているSourceを既存のグラフに動的に追加するにはどうすればよいですか?

val converterFlow1: Flow[ImpArticle, Article, NotUsed] 
val converterFlow2: Flow[NewsArticle, Article, NotUsed] 
val sinkDB: Sink[Article, Future[Done]] 

// These are being created every time I poll plugins  
val sourceContentProvider : Source[ImpArticle, NotUsed] 
val sourceNews : Source[NewsArticle, NotUsed] 
val sourceCit : Source[Article, NotUsed] 

val merged = Source.combine(
    sourceContentProvider.via(converterFlow1), 
    sourceNews.via(converterFlow2), 
    sourceCit)(Merge(_)) 

val res = merged 
    .buffer(10, OverflowStrategy.backpressure) 
    .toMat(sinkDB)(Keep.both) 
    .run() 

問題は、私はニュースから一回2時間ごとに、一回24時間毎にコンテンツプロバイダからデータを取得し、それは人間から来ていますので、最後のソースはいつでも来るかもしれないということです。

私はグラフが不変であることを知っていますが、私は周期的にグラフの新しいインスタンスSourceを添付することができますので、取り込みのプロセスをシングルポイントで調整できますか?

更新:私のデータは、私のケースでは3つのソースSource -sのストリームと言うことができます。しかし、私は外部クラス(いわゆるプラグイン)からSourceのインスタンスを取得するため、変更できません。これらのプラグインは、自分の摂取クラスとは独立して動作します。私は単一の巨大なクラスにそれらを組み合わせることはできませんSource

+0

なぜ新しいソースを追加する必要があるのか​​は明確ではありません。あなたは、コンテンツプロバイダーからのデータ、ニュースなど手動で入力されたデータを持っていると言いました。したがって、あなたは3つの情報源を持っています。だからあなたのコードは私にはうまく見えます。 –

+0

私は定期的に 'Source'クラスの新しいインスタンスを取得する必要がある任意の時間に取得します。したがって、私はコンテンツプロバイダから10Kの記事を摂取しているときの状況を避けたいと思います。その途中で2K項目のニュースから「ソース」を取得します。私は彼らに同時に摂取し、私のシングルスロットリングルールを尊重したい。 – expert

+0

データストリームを 'Source'のシーケンスとしてではなく、すべてのデータを順番に出力する単一の' Source'としてモデル化することをお勧めします。そして、 'Merge'コンビネータで十分でしょう。私はあなたのデザインが、あなたが説明した状況を避ける方法は本当にわかりません。 –

答えて

2

一般的には、ソースのストリームを単一のソースに結合すること、つまりSource[Source[T, _], Whatever]からSource[T, Whatever]に移動するのが正しい方法です。これはflatMapConcatまたはflatMapMergeで行うことができます。したがって、Source[Source[Article, NotUsed], NotUsed]を取得できる場合は、flatMap*のいずれかを使用して、最終的にSource[Article, NotUsed]を取得することができます。あなたのソース(意図していない)のそれぞれに対してそれを行い、あなたの元のアプローチがうまくいくはずです。

1

あなたはSource[Source[_,_],_]としてそれをモデル化することができない場合は、私が使用して検討したいSource.queue[Source[T,_]](queueSize, overflowStrategy):あなたは提出が失敗した場合に何が起こるかですが、注意する必要がありますどのようなhere

+0

Victor、http://stackoverflow.com/q/38033362/226895でお手伝いできますか? – expert

1

私は、Vladimir Matveevによって与えられた答えに基づいてコードを実装しました。それは、私にとってはよく使われているように見えるので、他の人と共有したいと思います。

私は約Source.queueを知っていたが、Viktor Klangが述べたが、私はflatMapConcatを知らなかった。それは純粋なawesomenessです。

implicit val system = ActorSystem("root") 
implicit val executor = system.dispatcher 
implicit val materializer = ActorMaterializer() 

case class ImpArticle(text: String) 
case class NewsArticle(text: String) 
case class Article(text: String) 

val converterFlow1: Flow[ImpArticle, Article, NotUsed] = Flow[ImpArticle].map(a => Article("a:" + a.text)) 
val converterFlow2: Flow[NewsArticle, Article, NotUsed] = Flow[NewsArticle].map(a => Article("a:" + a.text)) 
val sinkDB: Sink[Article, Future[Done]] = Sink.foreach { a => 
    Thread.sleep(1000) 
    println(a) 
} 

// These are being created every time I poll plugins 
val sourceContentProvider: Source[ImpArticle, NotUsed] = Source(List(ImpArticle("cp1"), ImpArticle("cp2"))) 
val sourceNews: Source[NewsArticle, NotUsed] = Source(List(NewsArticle("news1"), NewsArticle("news2"))) 
val sourceCit: Source[Article, NotUsed] = Source(List(Article("a1"), Article("a2"))) 

val (queue, completionFut) = Source 
    .queue[Source[Article, NotUsed]](10, backpressure) 
    .flatMapConcat(identity) 
    .buffer(2, OverflowStrategy.backpressure) 
    .toMat(sinkDB)(Keep.both) 
    .run() 

queue.offer(sourceContentProvider.via(converterFlow1)) 
queue.offer(sourceNews.via(converterFlow2)) 
queue.offer(sourceCit) 
queue.complete() 

completionFut.onComplete { 
    case Success(res) => 
    println(res) 
    system.terminate() 
    case Failure(ex) => 
    ex.printStackTrace() 
    system.terminate() 
} 

Await.result(system.whenTerminated, Duration.Inf) 

私はまだqueue.offerによって返さFutureの成功を確認したいが、私の場合には、これらの呼び出しはかなりまれになります。

関連する問題