私たちは、実際にバイナリファイルを処理するアッカストリームを使用(Akka Streams with Scala!は近い私がNIO /バイナリ側を除いて必要なものすべてに対処されます)。この周りの任意のドキュメントがなかったとして物事が軌道に乗るには少しトリッキーだったが、これは私たちが思い付いたものです:あなたが先に行くことができSource[Byte]
アッカである、あなたはbinSource
を持っていたら
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte)
val binSource = Source(binStream)
とストリーム変換(map
、flatMap
、transform
など)を適用し始めます。この機能は、Iterable
を受け取り、データを遅く読み込み、変換に使用できるようにするStream
というスカラを渡して、Source
コンパニオンオブジェクトのapply
を利用します。
EDIT
コンラートは、コメント欄で指摘したように、流れが原因それを遅延ストリームを構築だとして、それが遭遇した要素のメモ化を行っているという事実に大きなファイルで問題となることがあります。これは、あなたが慎重でない場合、メモリ不足の状態につながる可能性があります。あなたがStreamのためにドキュメントを見ればしかし、メモリ内に構築メモ化を避けるためのヒントがあります:
一つは、メモ化の慎重でなければなりません。気をつけなければ大量のメモリを大量に大量に食べることができます。その理由は、 ストリームのメモは、 scala.collection.immutable.Listのような構造を作成するからです。何かが頭部を に保持している限り、頭部は尾部を保持しているので、再帰的に を続けます。一方、 の頭に何も保持されていない場合(たとえば、ストリームを定義するためにdefを使用した場合)、もう一度 が直接使用されなくなると、それは消えます。
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binSource = Source(() => binStream(inputStream).iterator)
def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte)
ので、ここでの考え方は、取得、その後すぐval
に割り当てるとdef
経由Stream
を構築していないことです。だから、次のようにあなたが私の元の例を修正することができ、アカウントにそれを取って
それからiterator
を使用して、それを使ってAkka Source
を初期化します。このように設定することは、モーメント化の問題を避けるべきです。大きなファイルに対して古いコードを実行し、Source
でforeach
を実行してOutOfMemory
状況を生成することができました。新しいコードに切り替えると、私はこの問題を回避することができました。
ここではscala.collection.immutable.Streamの使用が危険です。メモ処理(!)を使用しています(ドキュメントhttp://www.scala-lang.org/api/current/index.html#scala.collectionを参照)。 .immutable.Stream)ので、ファイル全体をストリーミング(!)するのではなく、メモリに保存することになります。 –
@ Konrad'ktoso'Malawski、優秀な点。私はmemoizationの問題を回避するためのアップデートを投稿します。 – cmbaxter
良い更新、入力ストリームのイテレータを公開して正常に動作します。ストリームが完了したらリソースを閉じることを忘れないでください。 –