2016-08-12 11 views
2

ネストされたキーと値のペアを持つ大きな圧縮jsonファイルがあります。 jsonオブジェクトには約70-80個のキー(およびサブキー)がありますが、私はいくつかのキーにしか興味がありません。私はSpark SQLでjsonファイルを照会したいだけで、私が興味のあるキーと値のペアを選んで、それらをcsvファイルのセットに出力します。 170MBの圧縮jsonファイルを処理するのに約5分かかります。私はこのプロセスを最適化する方法があるのか​​どうか疑問に思っています。あるいは、この種の仕事のためにスパーク以外の優れたツールがありますか?ありがとう!ここでSparkでjsonファイルを高速処理する方法

は私が使っていたScalaのコードのスナップショットです:

val data = sc.textFile("abcdefg.txt.gz") 
// repartition the data 
val distdata = data.repartition(10) 
val dataDF = sqlContext.read.json(distdata) 
// register a temp table 
dataDF.registerTempTable("pixels") 

// query the json file, grab columns of interest 
val query = 
""" 
    |SELECT col1, col2, col3, col4, col5 
    |FROM pixels 
    |WHERE col1 IN (col1_v1, col1_v2, ...) 
""".stripMargin 
val result = sqlContext.sql(query) 

// reformat the timestamps 
val result2 = result.map(
    row => { 
    val timestamp = row.getAs[String](0).stripSuffix("Z").replace("T"," ") 
    Row(timestamp, row(1), row(2), row(3), row(4), row(5), row(6), row(7), 
     row(8), row(9), row(10), row(11)) 
    } 
) 
// output the result to a csv and remove the square bracket in each row 
val output_file = "/root/target" 
result2.map(row => row.mkString(",")).saveAsTextFile(output_file) 
+0

私は、メートルほとんどの時間を推測を解凍し、書き込み/読み出しに行き、できません並列化される。ジョブを配布して結果を収集するオーバーヘッドを追加してください。そして私の推測では、Sparkを使ってここであなたの速度を落としています。なぜ、パースされていない行の '再分割 'ですか? –

+0

あなたのデータを変換したいだけなら。あなたはSparkSQLのすべての機能を必要としません。ちょうどRDDに固執する。 jsonを解析するには、PlayJsonのような高速のjsonライブラリを使用します。変更してダンプします。 –

+0

明示的に要求されない限り、RDDでの再パーティションは行わないでください。 –

答えて

2

は、あなたのJSONデータが今

{ "c1": "timestamp_1", "c2": "12", "c3": "13", "c": "14", "c5": "15", ... } 
{ "c1": "timestamp_1", "c2": "22", "c3": "23", "c": "24", "c5": "25", ... } 
{ "c1": "timestamp_1", "c2": "32", "c3": "33", "c": "34", "c5": "35", ... } 

、次のようになり、あなたはJSONのLIBを使用し、RDDのにことができると言うことができます変換ダンプを実行します。

import play.api.libs.json._ 

val data = sc.textFile("abcdefg.txt.gz") 

val jsonData = data.map(line => Json.parse(line)) 

// filter the rdd and just keep the values of interest 
val filteredData = data 
    .filter(json => { 
    val c1 = (json \ "c1").as[String] 
    List[String]("c1_val1", "c2_val2", ...).contains(c1) 
    }) 

    // reformat the timestamps and transform to tuple 
val result2 = filteredData 
    .map(json => { 
    val ts = (json \ "c1").as[String] 
    val tsFormated = ts.stripSuffix("Z").replace("T"," ") 
    (tsFormated, (json \ "c2").as[String], ...) 
    }) 

val output_file = "/root/target" 

result2.saveAsTextFile(output_file) 
0

それはJSONを処理する簡単な方法です:

 val path = "examples/src/main/resources/people.json" 
     val peopleDF = spark.read.json(path) 

     peopleDF.printSchema() 

     peopleDF.createOrReplaceTempView("people") 

     val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() 

     val otherPeopleRDD = spark.sparkContext.makeRDD( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleRDD) otherPeople.show() 

は、ドキュメントを参照してください。http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

+0

この回答は役に立ちましたか? – pacman

関連する問題