2016-04-15 10 views
5

現在、処理パイプラインを処理するために手動で管理されたバックプレッシャを持つakkaアクタを使用するファイル処理ジョブがありますが、入力時にバックプレッシャを正常に管理できませんでしたファイル読み取り段階。バックプレッシャ付き行単位ファイルIOのシンク

このジョブは、各行の先頭にあるID番号で入力ファイルとグループ行を取り出し、新しいID番号の行にヒットすると、グループ化された行をメッセージ経由で処理アクターにプッシュし、新しいID番号をファイルの終わりに達するまで続けます。

それがシンクとしてファイルを使用して、アッカストリームのために良いユースケースになりますように。これは、ようだが、私はまだ三つのか分からない:私はしてファイルの行を読むことができますどのように)

1ライン?

2)どのようにすべての行に存在するIDでグループ化できますか?私は現在、これに非常に不可欠な処理を使用しています。私は、ストリームパイプラインで同じ能力を持つとは思いません。

3)データを下流で処理できるよりも速くメモリにラインを読み込まないように、バックプレッシャを適用するにはどうすればよいですか?

+0

質問:どのようにあなたが今背圧を管理していますか?あなたはシングルノードからファイルを読んでいますか?処理にakkaクラスタを使用していますか? – Aivean

+0

私は背圧を管理していません。私はいくつかのことを試しましたが、彼らはすべてハッキーでした(長い処理のような、処理主からの「Continue」メッセージを尋ね、手作業で反復する、非常に脆い)。だから私は代わりに私のアプリに十分なメモリを入力ファイル全体を消費するヒープスペースを与えることでそれをハックすることを選んだ。私はこれを共有サーバーに展開しなければならなくなり、もう誰もメモリを嫌うことはできません。 – dannytoone

答えて

7

アクカストリーム 'groupByが1つのアプローチです。しかし、groupByにはmaxSubstreamsというパラメータがあります。これは、ID範囲の最大値が正面にあることを知る必要があります。そう:溶液は、以下のサブストリームに分割するように同一IDのブロックを識別するために、scanを使用し、splitWhen:IDに

object Main extends App { 
    implicit val system = ActorSystem("system") 
    implicit val materializer = ActorMaterializer() 

    def extractId(s: String) = { 
    val a = s.split(",") 
    a(0) -> a(1) 
    } 

    val file = new File("/tmp/example.csv") 

    private val lineByLineSource = FileIO.fromFile(file) 
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024)) 
    .map(_.utf8String) 

    val future: Future[Done] = lineByLineSource 
    .map(extractId) 
    .scan((false,"",""))((l,r) => (l._2 != r._1, r._1, r._2)) 
    .drop(1) 
    .splitWhen(_._1) 
    .fold(("",Seq[String]()))((l,r) => (r._2, l._2 ++ Seq(r._3))) 
    .concatSubstreams 
    .runForeach(println) 

    private val reply = Await.result(future, 10 seconds) 
    println(s"Received $reply") 
    Await.ready(system.terminate(), 10 seconds) 
} 

extractId分割線 - >データタプル。 scanはID開始範囲フラグを持つid - >データタプルを先頭に付けます。 dropはプライマー要素をscanに落とします。 splitwhenは、各開始範囲に対して新しいサブストリームを開始します。 foldは、サブストリームをリストに連結し、IDの先頭部分のブール値を削除して、各サブストリームが単一の要素を生成するようにします。折りたたみの代わりに、単一のIDの行のストリームを処理し、ID範囲の結果を出力するカスタムSubFlowがほしいと思うかもしれません。 concatSubstreamsは、splitWhenによって生成されたID範囲のサブストリームを、runForEachで印刷された単一のストリームにマージします。

ラン:

$ cat /tmp/example.csv 
ID1,some input 
ID1,some more input 
ID1,last of ID1 
ID2,one line of ID2 
ID3,2nd before eof 
ID3,eof 

出力は次のとおりです。

(ID1,List(some input, some more input, last of ID1)) 
(ID2,List(one line of ID2)) 
(ID3,List(2nd before eof, eof)) 
+0

私はここで論理に従っていると思いますが、「マージサブストリーム」は何を説明できますか?私はAPIのドキュメントでその定義を見つけることができないようです。 – dannytoone

+0

scaladocsの他にドキュメントが見つかりませんでしたが、ステージのコメントを追加しました。また、この場合サブストリームは(ID範囲にしたがって)順番に開始および終了され、サブフローの非同期処理にかかわらず、concatはID範囲と同じ順序で出力を生成するため、 'mergeSubstreams'を' concatSubstreams'に置き換えました。 – tariksbl

0

巨大な変更を加えずにシステムに「バックプレッシャー」を加える最も簡単な方法は、アクターを使用する入力グループのメールボックスタイプをBoundedMailboxに変更するだけです。

  1. 変更高mailbox-push-timeout-timeでBoundedMailboxにあなたのラインを消費俳優の種類:

    bounded-mailbox { 
        mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox" 
        mailbox-capacity = 1 
        mailbox-push-timeout-time = 1h 
    } 
    
    val actor = system.actorOf(Props(classOf[InputGroupsConsumingActor]).withMailbox("bounded-mailbox")) 
    
  2. は(idで)作成するグループ化され、ファイルからそのイテレータからイテレータをイテレータを作成します。次に、データを循環させ、消費する俳優にグループを送ります。この場合、Actorのメールボックスがいっぱいになったときにsendがブロックされることに注意してください。それだ

    def iterGroupBy[A, K](iter: Iterator[A])(keyFun: A => K): Iterator[Seq[A]] = { 
        def rec(s: Stream[A]): Stream[Seq[A]] = 
        if (s.isEmpty) Stream.empty else { 
         s.span(keyFun(s.head) == keyFun(_)) match { 
         case (prefix, suffix) => prefix.toList #:: rec(suffix) 
        } 
        } 
        rec(iter.toStream).toIterator 
    } 
    
    val lines = Source.fromFile("input.file").getLines() 
    
    iterGroupBy(lines){l => l.headOption}.foreach { 
        lines:Seq[String] => 
         actor.tell(lines, ActorRef.noSender) 
    } 
    

! ファイル読み込み用のものを別のスレッドに移動したい場合は、おそらくブロックされるでしょう。 mailbox-capacityを調整することによって、消費されたメモリの量を調整することもできます。 StackOverflowが生じないテストが、ファイルからのバッチをリードすると常に速く処理超える場合、Streamで実装1または2

UPDiterGroupByのように、小容量を維持するために合理的なようです。

関連する問題