アクカストリーム 'groupBy
が1つのアプローチです。しかし、groupByにはmaxSubstreams
というパラメータがあります。これは、ID範囲の最大値が正面にあることを知る必要があります。そう:溶液は、以下のサブストリームに分割するように同一IDのブロックを識別するために、scan
を使用し、splitWhen
:IDに
object Main extends App {
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
def extractId(s: String) = {
val a = s.split(",")
a(0) -> a(1)
}
val file = new File("/tmp/example.csv")
private val lineByLineSource = FileIO.fromFile(file)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
val future: Future[Done] = lineByLineSource
.map(extractId)
.scan((false,"",""))((l,r) => (l._2 != r._1, r._1, r._2))
.drop(1)
.splitWhen(_._1)
.fold(("",Seq[String]()))((l,r) => (r._2, l._2 ++ Seq(r._3)))
.concatSubstreams
.runForeach(println)
private val reply = Await.result(future, 10 seconds)
println(s"Received $reply")
Await.ready(system.terminate(), 10 seconds)
}
extractId
分割線 - >データタプル。 scan
はID開始範囲フラグを持つid - >データタプルを先頭に付けます。 drop
はプライマー要素をscan
に落とします。 splitwhen
は、各開始範囲に対して新しいサブストリームを開始します。 fold
は、サブストリームをリストに連結し、IDの先頭部分のブール値を削除して、各サブストリームが単一の要素を生成するようにします。折りたたみの代わりに、単一のIDの行のストリームを処理し、ID範囲の結果を出力するカスタムSubFlow
がほしいと思うかもしれません。 concatSubstreams
は、splitWhenによって生成されたID範囲のサブストリームを、runForEach
で印刷された単一のストリームにマージします。
ラン:
$ cat /tmp/example.csv
ID1,some input
ID1,some more input
ID1,last of ID1
ID2,one line of ID2
ID3,2nd before eof
ID3,eof
出力は次のとおりです。
(ID1,List(some input, some more input, last of ID1))
(ID2,List(one line of ID2))
(ID3,List(2nd before eof, eof))
質問:どのようにあなたが今背圧を管理していますか?あなたはシングルノードからファイルを読んでいますか?処理にakkaクラスタを使用していますか? – Aivean
私は背圧を管理していません。私はいくつかのことを試しましたが、彼らはすべてハッキーでした(長い処理のような、処理主からの「Continue」メッセージを尋ね、手作業で反復する、非常に脆い)。だから私は代わりに私のアプリに十分なメモリを入力ファイル全体を消費するヒープスペースを与えることでそれをハックすることを選んだ。私はこれを共有サーバーに展開しなければならなくなり、もう誰もメモリを嫌うことはできません。 – dannytoone