2015-01-05 6 views
8

org.reactivestreamsライブラリを使用してJava NIO(高性能)を使用して大きなデータストリームを処理するいくつかのコード例がありますか?私は分散処理を目指しているので、Akkaを使った例が最高ですが、私はそれを理解することができます。NIOバイナリ処理にReactive Streamsを使用するにはどうすればよいですか?

は、まだケースのように思われる(私はすべてではない願っています)Source(非バイナリ)または直接のJava NIOにScalaのリゾート地でのファイルの読み込みの例(およびFiles.readAllBytesのような事も!)

おそらく最も私が見逃したアクチベータテンプレートがありますか?

答えて

4

私たちは、実際にバイナリファイルを処理するアッカストリームを使用(Akka Streams with Scala!は近い私がNIO /バイナリ側を除いて必要なものすべてに対処されます)。この周りの任意のドキュメントがなかったとして物事が軌道に乗るには少しトリッキーだったが、これは私たちが思い付いたものです:あなたが先に行くことができSource[Byte]アッカである、あなたはbinSourceを持っていたら

val binFile = new File(filePath) 
val inputStream = new BufferedInputStream(new FileInputStream(binFile)) 
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte) 
val binSource = Source(binStream) 

とストリーム変換(mapflatMaptransformなど)を適用し始めます。この機能は、Iterableを受け取り、データを遅く読み込み、変換に使用できるようにするStreamというスカラを渡して、Sourceコンパニオンオブジェクトのapplyを利用します。

EDIT

コンラートは、コメント欄で指摘したように、流れが原因それを遅延ストリームを構築だとして、それが遭遇した要素のメモ化を行っているという事実に大きなファイルで問題となることがあります。これは、あなたが慎重でない場合、メモリ不足の状態につながる可能性があります。あなたがStreamのためにドキュメントを見ればしかし、メモリ内に構築メモ化を避けるためのヒントがあります:

一つは、メモ化の慎重でなければなりません。気をつけなければ大量のメモリを大量に大量に食べることができます。その理由は、 ストリームのメモは、 scala.collection.immutable.Listのような構造を作成するからです。何かが頭部を に保持している限り、頭部は尾部を保持しているので、再帰的に を続けます。一方、 の頭に何も保持されていない場合(たとえば、ストリームを定義するためにdefを使用した場合)、もう一度 が直接使用されなくなると、それは消えます。

val binFile = new File(filePath) 
val inputStream = new BufferedInputStream(new FileInputStream(binFile))  
val binSource = Source(() => binStream(inputStream).iterator) 

def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte) 

ので、ここでの考え方は、取得、その後すぐvalに割り当てるとdef経由Streamを構築していないことです。だから、次のようにあなたが私の元の例を修正することができ、アカウントにそれを取って

それからiteratorを使用して、それを使ってAkka Sourceを初期化します。このように設定することは、モーメント化の問題を避けるべきです。大きなファイルに対して古いコードを実行し、Sourceforeachを実行してOutOfMemory状況を生成することができました。新しいコードに切り替えると、私はこの問題を回避することができました。

+2

ここではscala.collection.immutable.Streamの使用が危険です。メモ処理(!)を使用しています(ドキュメントhttp://www.scala-lang.org/api/current/index.html#scala.collectionを参照)。 .immutable.Stream)ので、ファイル全体をストリーミング(!)するのではなく、メモリに保存することになります。 –

+0

@ Konrad'ktoso'Malawski、優秀な点。私はmemoizationの問題を回避するためのアップデートを投稿します。 – cmbaxter

+1

良い更新、入力ストリームのイテレータを公開して正常に動作します。ストリームが完了したらリソースを閉じることを忘れないでください。 –

7

scala.collection.immutable.Streamこのようなファイルを使用しないでください。なぜなら、メモ処理を実行しているからです。つまり、怠け者ですが、ストリーム全体をメモリにバッファリング(メモ)します。

これは間違いなくではありません。「ファイルをストリーム処理する」と思ったときに、あなたが望むものではありません。 ScalaのStreamがこのように機能する理由は、機能的な設定では完全な意味があるからです.Fibbonachiの数値を何度も何度も計算することを避けることができます。詳細はScalaDocを参照してください。

Akka Streamsはリアクションストリームの実装を提供し、ここで使用できるFileIOクラスを提供します(必要なときにのみデータを適切にバック圧着して残りのストリームを消費する準備ができています) :ここでは

import java.io._ 
import akka.actor.ActorSystem 
import akka.stream.scaladsl.{ Sink, Source } 

object ExampleApp extends App { 


    implicit val sys = ActorSystem() 
    implicit val mat = FlowMaterializer() 

    FileIO.fromPath(Paths.get("/example/file.txt")) 
    .map(c ⇒ { print(c); c }) 
    .runWith(Sink.onComplete(_ ⇒ { f.close(); sys.shutdown() })) 
} 

が、これはアッカのバージョンを書いとして-の電流なので、2.5.xのシリーズのためのものであることをIO with Akka Streams ノートでの作業の詳細ドキュメントです。

+0

すばらしい答えをありがとう - 私は私が探していたものを知るために私自身の質問を再度見つけなければならなかった:http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/java /stream-io.html#Streaming_File_IO - akka 2.4がリリースされました(恐らくNI​​O 2という意味です)! (APIを使用してコードアンサーを更新/作成すると、私は受け入れます) – Stephen

+0

本当に常にストリーム全体がメモリに保持されますか?それともストリームの先頭への参照を保持しているかによって異なりますか?私の(欲しい?)印象は、あなたが繰り返し尾を処理して頭を忘れてしまうと、「ストリーム」アイテムが最終的に割り当てを解除されるということでした。 – dividebyzero

+0

ドキュメントを読んでください、私はそれらを下にリンクしました。 http://www.scala-lang.org/api/current/scala/collection/immutable/Stream.html –

関連する問題