2010-11-23 21 views
35

Scala Streamで大きなCSVファイル(> 1 GB)を読むにはどうすればよいですか?コード例がありますか?それとも、大きなCSVファイルをメモリに読み込まずに読み込むために別の方法を使用しますか?Scala Streamクラスを使用して大きなCSVファイルを読むにはどうすればよいですか?

+0

遅延評価機能のようにストリームを意味しますか?おそらく、おそらく必要ですが、必須ではありませんか? - ファイルを行単位で読み込むことは、本質的に既にあります。私はまだScala ioの速度に敏感ではありませんが、getLines(ソースのクイックブラウズから)も怠惰な方法で実装されています - すべてのファイルをメモリに読み込みますか? –

+0

scala.Source.fromFile()を使用してからgetLines()を使用するときにOutOfMemoryExceptionが発生するので、メモリに読み込むと思います。したがって、Streamクラスを使用すると有効な代替のように聞こえるでしょう。 –

答えて

62

すでに述べたようにSource.fromFile(...).getLinesをちょうど使用してください。あなたがメモリを取得している場合は、すでに怠け者であるイテレータに、(あなたは再びそれらを読むことができるようにあなたは、あなたが以前に取得した値をメモ化するたかった遅延コレクションとしてストリームを使用したい)

を返し

問題は、の後、問題はのgetLinesの後ろにあるでしょう。厳密な収集を強制するtoListのような操作は問題を引き起こします。

+1

私はOutOfMemoryExceptionが本当にその後の操作によって引き起こされたと思います。ありがとう! –

+0

これは、ビジネスロジックが何かを計算するためにイテレータを何回かトラバースする必要がある場合、イテレータを扱うのが面倒かもしれません。あなたはイテレータを一度使うことができます。それはストリームを扱う方が良いと思われる。この質問のように:http://stackoverflow.com/questions/17004455/scala-iterator-and-stream-example-stream-fails-on-reuse – ses

10

Scalaのcollection.immutable.StreamがStreamであるとは限りません。これはではなく、です。ストリームは怠惰ですが、メモを取っています。

私はあなたが何をしようとしているのかわかりませんが、大量のメモリを使用せずにファイルを1行ずつ読むだけでうまくいくはずです。

getLinesは遅れて評価する必要があります(ファイルのサイズが2³²行以下の場合はクラッシュしないでください)。そうであれば#scalaに問い合わせるか、バグチケットを発行してください(あるいはその両方)。

3

ファイル全体の内容を一度にすべてのメモリにロードする必要を回避しながら、大きなファイルを1行ずつ処理する場合は、Iteratorscala.io.Sourceで返します。

私はちょうどこれらのタイプのユースケースに使用する小さな関数tryProcessSource(2つのサブ関数を含む)を持っています。関数は最大4つのパラメータをとりますが、最初のパラメータのみが必要です。その他のパラメータには、デフォルト値が用意されています。

ここ関数プロファイル(フル機能の実装が一番下にある)は次のとおり

def tryProcessSource(
    file: File, 
    parseLine: (Int, String) => Option[List[String]] = 
    (index, unparsedLine) => Some(List(unparsedLine)), 
    filterLine: (Int, List[String]) => Option[Boolean] = 
    (index, parsedValues) => Some(true), 
    retainValues: (Int, List[String]) => Option[List[String]] = 
    (index, parsedValues) => Some(parsedValues), 
): Try[List[List[String]]] = { 
    ??? 
} 

最初のパラメータ、file: Fileが、必要とされます。また、CSVのような行指向のテキストファイルを指すjava.io.Fileの有効なインスタンスです。

第2のパラメータparseLine: (Int, String) => Option[List[String]]はオプションです。また、提供されている場合は、2つの入力パラメータを受け取ることを期待する関数でなければなりません。 index: Int,unparsedLine: String。そしてOption[List[String]]を返す。この関数は、有効な列の値で構成されたList[String]をラップしたSomeを返します。または、ストリーミングプロセス全体が早期に中断していることを示すNoneを返すことがあります。このパラメーターを指定しないと、デフォルト値の(index, line) => Some(List(line))が提供されます。このデフォルトでは、行全体が単一のString値として返されます。

第3のパラメータfilterLine: (Int, List[String]) => Option[Boolean]はオプションです。また、提供されている場合は、2つの入力パラメータを受け取ることを期待する関数でなければなりません。 index: Int,parsedValues: List[String]。そして、Option[Boolean]を返します。この特定の行を出力に含めるかどうかを示すラップされたBooleanを返します。または、ストリーミングプロセス全体が早期に中断していることを示すNoneを返すことがあります。このパラメーターを指定しないと、デフォルト値の(index, values) => Some(true)が提供されます。このデフォルトでは、すべての行が含まれます。

第4の最終パラメータretainValues: (Int, List[String]) => Option[List[String]]はオプションです。また、提供されている場合は、2つの入力パラメータを受け取ることを期待する関数でなければなりません。 index: Int,parsedValues: List[String]。そしてOption[List[String]]を返す。この関数は、サブタイプの一部および/または既存の列の値の変更で構成されるをラップされたSomeを返します。または、ストリーミングプロセス全体が早期に中断していることを示すNoneを返すことがあります。このパラメーターを指定しないと、デフォルト値(index, values) => Some(values)が提供されます。このデフォルト値は、第2パラメータであるparseLineによって解析された値になります。

次の内容のファイルを考えてみましょう(4行):tryLinesDefaultsため、この出力で

street,street2,city,state,zip 
100 Main Str,,Irving,TX,75039 
231 Park Ave,,Irving,TX,75039 
1400 Beltline Rd,Apt 312,Dallas,Tx,75240 

次の呼び出しのプロファイルが...

val tryLinesDefaults = 
    tryProcessSource(new File("path/to/file.csv")) 

...結果(ファイルの変更されていない内容):

Success(
    List(
    List("street,street2,city,state,zip"), 
    List("100 Main Str,,Irving,TX,75039"), 
    List("231 Park Ave,,Irving,TX,75039"), 
    List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240") 
) 
) 

以下呼び出すプロフィール...

val tryLinesParseOnly = 
    tryProcessSource(
     new File("path/to/file.csv") 
    , parseLine = 
     (index, unparsedLine) => Some(unparsedLine.split(",").toList) 
) 

... tryLinesParseOnlyため、この出力結果(各行は、個々の列の値に解析):

Success(
    List(
    List("street","street2","city","state","zip"), 
    List("100 Main Str","","Irving,TX","75039"), 
    List("231 Park Ave","","Irving","TX","75039"), 
    List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240") 
) 
) 

呼び出しプロファイルに続いて...

val tryLinesIrvingTxNoHeader = 
    tryProcessSource(
     new File("C:/Users/Jim/Desktop/test.csv") 
    , parseLine = 
     (index, unparsedLine) => Some(unparsedLine.split(",").toList) 
    , filterLine = 
     (index, parsedValues) => 
      Some(
      (index != 0) && //skip header line 
      (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving 
      (parsedValues(3).toLowerCase == "Tx".toLowerCase) 
     ) 
) 

... tryLinesIrvingTxNoHeaderため、この出力結果(個々の列の値、無ヘッダ及びテキサス州にある2つのだけの行に解析各行):

Success(
    List(
    List("100 Main Str","","Irving,TX","75039"), 
    List("231 Park Ave","","Irving","TX","75039"), 
) 
) 

ここ全体tryProcessSource関数の実装の:

import scala.io.Source 
import scala.util.Try 

import java.io.File 

def tryProcessSource(
    file: File, 
    parseLine: (Int, String) => Option[List[String]] = 
    (index, unparsedLine) => Some(List(unparsedLine)), 
    filterLine: (Int, List[String]) => Option[Boolean] = 
    (index, parsedValues) => Some(true), 
    retainValues: (Int, List[String]) => Option[List[String]] = 
    (index, parsedValues) => Some(parsedValues) 
): Try[List[List[String]]] = { 
    def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] = 
    try {Try(transfer(source))} finally {source.close()} 
    def recursive(
    remaining: Iterator[(String, Int)], 
    accumulator: List[List[String]], 
    isEarlyAbort: Boolean = 
     false 
): List[List[String]] = { 
    if (isEarlyAbort || !remaining.hasNext) 
     accumulator 
    else { 
     val (line, index) = 
     remaining.next 
     parseLine(index, line) match { 
     case Some(values) => 
      filterLine(index, values) match { 
      case Some(keep) => 
       if (keep) 
       retainValues(index, values) match { 
        case Some(valuesNew) => 
        recursive(remaining, valuesNew :: accumulator) //capture values 
        case None => 
        recursive(remaining, accumulator, isEarlyAbort = true) //early abort 
       } 
       else 
       recursive(remaining, accumulator) //discard row 
      case None => 
       recursive(remaining, accumulator, isEarlyAbort = true) //early abort 
      } 
     case None => 
      recursive(remaining, accumulator, isEarlyAbort = true) //early abort 
     } 
    } 
    } 
    Try(Source.fromFile(file)).flatMap(
    bufferedSource => 
     usingSource(bufferedSource) { 
     source => 
      recursive(source.getLines().buffered.zipWithIndex, Nil).reverse 
     } 
) 
} 

このソリューションは比較的簡潔ですが、それは私にかなり時間がかかったと私は最終的にここに得ることができたの前に多くのリファクタリングが渡されます。改善の余地がある場合は教えてください。


更新:私はちょうどit's own StackOverflow questionとして問題を尋ねました。そして、今度はhas an answer fixing the errorと言います。

私はこれをもっと汎用的にして、retainValuesパラメータをtransformLineに変更し、以下の新しいgenerics-ified関数定義を変更しました。しかし、IntelliJでハイライトエラーが発生しています。「一部の[List [String]型が期待される型[Option] [A]」に適合しないため、デフォルト値を変更する方法を見つけることができませんでした去る。

def tryProcessSource2[A <: AnyRef](
    file: File, 
    parseLine: (Int, String) => Option[List[String]] = 
    (index, unparsedLine) => Some(List(unparsedLine)), 
    filterLine: (Int, List[String]) => Option[Boolean] = 
    (index, parsedValues) => Some(true), 
    transformLine: (Int, List[String]) => Option[A] = 
    (index, parsedValues) => Some(parsedValues) 
): Try[List[A]] = { 
    ??? 
} 

どのようにこの作品を作成するための助力があれば幸いです。

関連する問題