2016-11-23 9 views
6

に基づいアッカ流の流れを制御しないが、私は2つのソースを持っていると言う:方法1は、別のストリーム

val ticks = Source(1 to 10) 
val values = Source[Int](Seq(3,4,4,7,8,8,8,8,9).to[collection.immutable.Iterable]) 

私は現在の値に基づいて、アッカストリームでGraph[...]処理ステップを作成したいのですが値ストリーム内で可能な限り消費します。どのようにこの動作を実装し

(1, None) 
(2, None) 
(3, Some(Seq(3))) 
(4, Some(Seq(4, 4))) 
(5, None) 
(6, None) 
(7, Some(Seq(7))) 
(8, Some(Seq(8,8,8,8))) 
(9, Some(Seq(9))) 
(10, None) 

:値が一致するときに、たとえば、私はそうでないような出力が得られカチカチを維持し、第2のソースに一致するすべての要素を返すようにしたいですか?

答えて

1

私はあなたがこのテーマに関するアッカストリームのドキュメントを見てみましょうお勧めします:http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-graphs.html

サイトによると、あなたはこのようにGraphStageを実装することができます

final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] { 

val in = Inlet[E]("AccumulateWhileUnchanged.in") 
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out") 

override def shape = FlowShape(in, out) 
} 

この件に関するブログ記事もあります:http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/

希望するもの:

+0

詳細を教えてください。あなたの答えは、それはあなたが与えられた質問を解決するための舞台を提供していないカスタムステージを書くことが可能だと述べている... –

関連する問題