2016-04-17 6 views
5

カスタムフローのあるストリームがあり、特定の段階でストリームを分割し、後で再びマージする2つの代替データ処理が必要です。akkaストリームの条件に基づいた代替フロー

など。

    -> F3 -> F6 
Src -> F1 -> F2    > Merge -> Sink 
        -> F4 -> F5 

F2データはフォーマットAが含まれている場合、それはF4に行く他に、F3を流れるように行くべきと言っ条件を持っている必要があります。

私の知る限り、各フローは各方向に1つのポートしか持つことができません(または双方向の場合は2つ)。そのようなフローはどのようにサポートできますか?

答えて

11

Broadcastを使用してストリームを分割すると、各ストリームでfilterまたはcollectを使用して必要なデータをフィルタリングできます。

val split = builder.add(Broadcast[Int](2)) 

Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink 
        -> filterCondB -> F4 -> F5 -> Merge 

また、出力ポートの数とポート番号f: T => Intの値からマップ関数を処理Partitionステージがあります。

val portMapper(value: T): Int = value match { 
    case CondA => 0 
    case CondB => 1 
} 

val split = builder.add(Partition[T](2, portMapper)) 

Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink 
      split -> F4 -> F5 -> Merge 

おそらくもっと簡単な方法があります。

+0

ありがとう、あなたは私の日を救う。我々はvalを作成することができますfilterCondA = Flow [Int] .filter() –

関連する問題