ネストされたキーと値のペアを持つ大きな圧縮jsonファイルがあります。 jsonオブジェクトには約70-80個のキー(およびサブキー)がありますが、私はいくつかのキーにしか興味がありません。私はSpark SQLでjsonファイルを照会したいだけで、私が興味のあるキーと値のペアを選んで、それらをcsvファイルのセットに出力します。 170MBの圧縮jsonファイルを処理するのに約5分かかります。私はこのプロセスを最適化する方法があるのかどうか疑問に思っています。あるいは、この種の仕事のためにスパーク以外の優れたツールがありますか?ありがとう!ここでSparkでjsonファイルを高速処理する方法
は私が使っていたScalaのコードのスナップショットです:
val data = sc.textFile("abcdefg.txt.gz")
// repartition the data
val distdata = data.repartition(10)
val dataDF = sqlContext.read.json(distdata)
// register a temp table
dataDF.registerTempTable("pixels")
// query the json file, grab columns of interest
val query =
"""
|SELECT col1, col2, col3, col4, col5
|FROM pixels
|WHERE col1 IN (col1_v1, col1_v2, ...)
""".stripMargin
val result = sqlContext.sql(query)
// reformat the timestamps
val result2 = result.map(
row => {
val timestamp = row.getAs[String](0).stripSuffix("Z").replace("T"," ")
Row(timestamp, row(1), row(2), row(3), row(4), row(5), row(6), row(7),
row(8), row(9), row(10), row(11))
}
)
// output the result to a csv and remove the square bracket in each row
val output_file = "/root/target"
result2.map(row => row.mkString(",")).saveAsTextFile(output_file)
私は、メートルほとんどの時間を推測を解凍し、書き込み/読み出しに行き、できません並列化される。ジョブを配布して結果を収集するオーバーヘッドを追加してください。そして私の推測では、Sparkを使ってここであなたの速度を落としています。なぜ、パースされていない行の '再分割 'ですか? –
あなたのデータを変換したいだけなら。あなたはSparkSQLのすべての機能を必要としません。ちょうどRDDに固執する。 jsonを解析するには、PlayJsonのような高速のjsonライブラリを使用します。変更してダンプします。 –
明示的に要求されない限り、RDDでの再パーティションは行わないでください。 –