2017-03-05 4 views
1

も私のために働いていないようです。アイテムは、そこに定義されたシンクにならない。ここに私が持っているものがあります。に定義されているシンクには何も来ません。

val merged: Source[ArticleWithKeywords, _] = ... 
val (ks, fut) = merged 
    .alsoTo(Flow[ArticleWithKeywords].map { a => a.id -> a.ids.toList }.to(queueManager.getIdsForAnsSink)) 
    .map(_.id) 
    .groupedWithin(100, 5 seconds) 
    .mapAsync(4) { ids => runReferenceFetching(ids) } 
    .viaMat(KillSwitches.single)(Keep.right) 
    .toMat(Sink.ignore)(Keep.both) 
    .run() 

しかし、項目がrunReferenceFetchingに達しています。私は何が欠けていますか?

+0

'queueManager.getIdsForAnsSink'を' Sink.foreach'に置き換えても、期待どおりに動作しませんか? (またはそれがそれを修正した場合、そのシンクで何か間違っている/予期せぬことがあります) – johanandren

+0

@johanandren 'Sink.foreach'で動作します。しかし、一方では単純な 'Sourse(List(..))。runWith(queueManager.getIdsForAnsSink)も機能します。だから私はここで何が失敗しているのか混乱している。 – expert

+0

また、実際にはいずれかの下流ブランチバックプレッシャーがある場合に背圧をかけるブロードキャストステージなので、要素がシンクに到達していても何とか失われてしまうと言います。多分並行性の問題? – johanandren

答えて

0

この問題は、alsoToとは関係ありません。問題は、Source.fromPublisherを使用して作成されたシンクです。私は誤って同じPublisher[T]を使用して複数のシンクを作成できると思っていました。もう1つのシンクがすでに存在していたので、もう1つは動作しませんでした。

関連する問題