Scala Streamで大きなCSVファイル(> 1 GB)を読むにはどうすればよいですか?コード例がありますか?それとも、大きなCSVファイルをメモリに読み込まずに読み込むために別の方法を使用しますか?Scala Streamクラスを使用して大きなCSVファイルを読むにはどうすればよいですか?
答えて
すでに述べたようにSource.fromFile(...).getLines
をちょうど使用してください。あなたがメモリを取得している場合は、すでに怠け者であるイテレータに、(あなたは再びそれらを読むことができるようにあなたは、あなたが以前に取得した値をメモ化するたかった遅延コレクションとしてストリームを使用したい)
を返し
問題は、の後、問題はのgetLinesの後ろにあるでしょう。厳密な収集を強制するtoList
のような操作は問題を引き起こします。
私はOutOfMemoryExceptionが本当にその後の操作によって引き起こされたと思います。ありがとう! –
これは、ビジネスロジックが何かを計算するためにイテレータを何回かトラバースする必要がある場合、イテレータを扱うのが面倒かもしれません。あなたはイテレータを一度使うことができます。それはストリームを扱う方が良いと思われる。この質問のように:http://stackoverflow.com/questions/17004455/scala-iterator-and-stream-example-stream-fails-on-reuse – ses
Scalaのcollection.immutable.Stream
がStreamであるとは限りません。これはではなく、です。ストリームは怠惰ですが、メモを取っています。
私はあなたが何をしようとしているのかわかりませんが、大量のメモリを使用せずにファイルを1行ずつ読むだけでうまくいくはずです。
getLines
は遅れて評価する必要があります(ファイルのサイズが2³²行以下の場合はクラッシュしないでください)。そうであれば#scalaに問い合わせるか、バグチケットを発行してください(あるいはその両方)。
ファイル全体の内容を一度にすべてのメモリにロードする必要を回避しながら、大きなファイルを1行ずつ処理する場合は、Iterator
をscala.io.Source
で返します。
私はちょうどこれらのタイプのユースケースに使用する小さな関数tryProcessSource
(2つのサブ関数を含む)を持っています。関数は最大4つのパラメータをとりますが、最初のパラメータのみが必要です。その他のパラメータには、デフォルト値が用意されています。
ここ関数プロファイル(フル機能の実装が一番下にある)は次のとおり
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
???
}
最初のパラメータ、file: File
が、必要とされます。また、CSVのような行指向のテキストファイルを指すjava.io.File
の有効なインスタンスです。
第2のパラメータparseLine: (Int, String) => Option[List[String]]
はオプションです。また、提供されている場合は、2つの入力パラメータを受け取ることを期待する関数でなければなりません。 index: Int
,unparsedLine: String
。そしてOption[List[String]]
を返す。この関数は、有効な列の値で構成されたList[String]
をラップしたSome
を返します。または、ストリーミングプロセス全体が早期に中断していることを示すNone
を返すことがあります。このパラメーターを指定しないと、デフォルト値の(index, line) => Some(List(line))
が提供されます。このデフォルトでは、行全体が単一のString
値として返されます。
第3のパラメータfilterLine: (Int, List[String]) => Option[Boolean]
はオプションです。また、提供されている場合は、2つの入力パラメータを受け取ることを期待する関数でなければなりません。 index: Int
,parsedValues: List[String]
。そして、Option[Boolean]
を返します。この特定の行を出力に含めるかどうかを示すラップされたBoolean
を返します。または、ストリーミングプロセス全体が早期に中断していることを示すNone
を返すことがあります。このパラメーターを指定しないと、デフォルト値の(index, values) => Some(true)
が提供されます。このデフォルトでは、すべての行が含まれます。
第4の最終パラメータretainValues: (Int, List[String]) => Option[List[String]]
はオプションです。また、提供されている場合は、2つの入力パラメータを受け取ることを期待する関数でなければなりません。 index: Int
,parsedValues: List[String]
。そしてOption[List[String]]
を返す。この関数は、サブタイプの一部および/または既存の列の値の変更で構成されるをラップされたSome
を返します。または、ストリーミングプロセス全体が早期に中断していることを示すNone
を返すことがあります。このパラメーターを指定しないと、デフォルト値(index, values) => Some(values)
が提供されます。このデフォルト値は、第2パラメータであるparseLine
によって解析された値になります。
次の内容のファイルを考えてみましょう(4行):tryLinesDefaults
ため、この出力で
street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240
次の呼び出しのプロファイルが...
val tryLinesDefaults =
tryProcessSource(new File("path/to/file.csv"))
...結果(ファイルの変更されていない内容):
Success(
List(
List("street,street2,city,state,zip"),
List("100 Main Str,,Irving,TX,75039"),
List("231 Park Ave,,Irving,TX,75039"),
List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
)
)
以下呼び出すプロフィール...
val tryLinesParseOnly =
tryProcessSource(
new File("path/to/file.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
)
... tryLinesParseOnly
ため、この出力結果(各行は、個々の列の値に解析):
Success(
List(
List("street","street2","city","state","zip"),
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
)
)
呼び出しプロファイルに続いて...
val tryLinesIrvingTxNoHeader =
tryProcessSource(
new File("C:/Users/Jim/Desktop/test.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
, filterLine =
(index, parsedValues) =>
Some(
(index != 0) && //skip header line
(parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
(parsedValues(3).toLowerCase == "Tx".toLowerCase)
)
)
... tryLinesIrvingTxNoHeader
ため、この出力結果(個々の列の値、無ヘッダ及びテキサス州にある2つのだけの行に解析各行):
Success(
List(
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
)
)
ここ全体tryProcessSource
関数の実装の:
import scala.io.Source
import scala.util.Try
import java.io.File
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
try {Try(transfer(source))} finally {source.close()}
def recursive(
remaining: Iterator[(String, Int)],
accumulator: List[List[String]],
isEarlyAbort: Boolean =
false
): List[List[String]] = {
if (isEarlyAbort || !remaining.hasNext)
accumulator
else {
val (line, index) =
remaining.next
parseLine(index, line) match {
case Some(values) =>
filterLine(index, values) match {
case Some(keep) =>
if (keep)
retainValues(index, values) match {
case Some(valuesNew) =>
recursive(remaining, valuesNew :: accumulator) //capture values
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
else
recursive(remaining, accumulator) //discard row
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
}
}
Try(Source.fromFile(file)).flatMap(
bufferedSource =>
usingSource(bufferedSource) {
source =>
recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
}
)
}
このソリューションは比較的簡潔ですが、それは私にかなり時間がかかったと私は最終的にここに得ることができたの前に多くのリファクタリングが渡されます。改善の余地がある場合は教えてください。
更新:私はちょうどit's own StackOverflow questionとして問題を尋ねました。そして、今度はhas an answer fixing the errorと言います。
私はこれをもっと汎用的にして、retainValues
パラメータをtransformLine
に変更し、以下の新しいgenerics-ified関数定義を変更しました。しかし、IntelliJでハイライトエラーが発生しています。「一部の[List [String]型が期待される型[Option] [A]」に適合しないため、デフォルト値を変更する方法を見つけることができませんでした去る。
def tryProcessSource2[A <: AnyRef](
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
transformLine: (Int, List[String]) => Option[A] =
(index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
???
}
どのようにこの作品を作成するための助力があれば幸いです。
- 1. 大きなCSVファイルを扱うにはどうすればいいですか?
- 2. 大きなCSVファイルを読む
- 3. Javaを使用して大きなXMLファイルを解析するにはどうすればよいですか?
- 4. 大きなファイルの5行目を読むにはどのような機能を使用しますか?
- 5. slf4jを使用して.propertiesファイルを読むにはどうすればよいですか?
- 6. 巨大な.csvファイルを読む
- 7. カンマをたくさん含む変数「コメント」を使用してCSVファイルを読むにはどうすればよいですか?
- 8. RubyでCSVを読むときにヘッダー行をスキップするにはどうすればよいですか?
- 9. Rにアポストロフィを含む.csvファイルを読み取るにはどうすればよいですか?
- 10. iPhoneで大規模なUTF-8ファイルを読むにはどうすればよいですか?
- 11. エスケープされていないエンクロージャを使用してCSVファイルを読む
- 12. DOMを使用してCSVファイルを正しく解析するにはどうすればよいですか?
- 13. SQLalchemyを使用して大きなファイルを読む
- 14. 「Galaxy」を使用してCSVファイルからアイテムをインポートするにはどうすればよいですか?
- 15. Play Scalaでstringをenumクラスとして読み取るにはどうすればよいですか?
- 16. PHPを使用して変数列をCSVファイルにエクスポートするにはどうすればよいですか?
- 17. Stream-cons#はどのようにScalaで翻訳されていますか?
- 18. 大きなJSONリストをPythonで読み込むにはどうすればよいですか?
- 19. C#.netを使用してMemoryStreamからダウンロードしたCSVファイルを読むにはどうすればよいですか?
- 20. C#でSNMP MIBファイルを読むにはどうすればよいですか?
- 21. RでNCSSファイルを読むにはどうすればよいですか?
- 22. C++でiniファイルを読むにはどうすればよいですか?
- 23. ローカルテキストファイルを読むにはどうすればよいですか?
- 24. forループを使用して複数のファイルを書き込むにはどうすればよいですか?
- 25. PHPファイルapiを使用して生のバイトを書き込むにはどうすればよいですか?
- 26. VBを使用してCSVファイルをデザインするにはどうすればよいですか?
- 27. Perlを使用してCSVファイルを作成するにはどうすればよいですか?
- 28. phpMyAdminを使用して大きなcsvファイルをインポートします。
- 29. .infファイルを読むにはどうすればよいですか?
- 30. Scalaでインスタンス化されていないクラスを参照するにはどうすればよいですか?
遅延評価機能のようにストリームを意味しますか?おそらく、おそらく必要ですが、必須ではありませんか? - ファイルを行単位で読み込むことは、本質的に既にあります。私はまだScala ioの速度に敏感ではありませんが、getLines(ソースのクイックブラウズから)も怠惰な方法で実装されています - すべてのファイルをメモリに読み込みますか? –
scala.Source.fromFile()を使用してからgetLines()を使用するときにOutOfMemoryExceptionが発生するので、メモリに読み込むと思います。したがって、Streamクラスを使用すると有効な代替のように聞こえるでしょう。 –