2016-11-17 6 views
0

Spark-Shell内でSpark/Spark SQlを使用してJSONブロブをデータフレームにフラット化します。Spark/Spark SQLを使用してJSONブロブをデータフレームにフラット化するには

私が行うとき

df2.show // 3行

body 

------------------------------------ 

{"k1": "v1", "k2": "v2" } 

{"k3": "v3"} 

{"k4": "v4", "k5": "v5", "k6": "v6"} 

------------------------------------- 

は今、私はこれらの行/レコードの十億を持っていますが、最大で5つの異なるJSONスキーマがあるだろうと言う示し

val df = spark.sql("select body from test limit 3"); // body is a json encoded blob column 
val df2 = df.select(df("body").cast(StringType).as("body")) 

すべての10億行今私はどのように私は以下の形式でデータフレームを取得するように平坦化するのですか? df.forEachまたはdf.forEachPartitionまたはdf.explodeまたはdf.flatMapを使用する必要がありますか? 10億のデータフレームを作成せず、それらのすべてまたは何かを非効率的に結合しようとしていることを確認するにはどうすればよいですか。私がコードサンプルを見ることができれば素晴らしいだろう。また、これはNilを持っている可能性があるので、彼らはスペースを取るかどうか疑問に思いますか?

import org.apache.spark.sql._ 
val rdd = df2.rdd.map { case Row(j: String) => j } 
spark.read.json(rdd).show() 

スパークSQL:あなたのような何かをするのであれば

Spark SQL JSON

:あなたがオンになっているが、この例を見てスパークのバージョン

"K1" | "K2" | "K3" | "K4" | "K5" | "K6" 
--------------------------------------- 
"V1" | "V2" | 
      | "V3" | 
        | "V4" | "V5" | "V6" 

答えて

1

わかりません重い吊り上げをするでしょう。

+0

これはうまくいきませんでした。私はSpark 2.0.2を使用しています。私は次のエラーが表示されます。エラー:オーバーロードされたメソッド値json代替: (jsonRDD:org.apache.spark.rdd.RDD [String])org.apache.spark.sql.DataFrame user1870400

+0

これに変更するとspark.read.json(df2.toJSON.rdd).show()df2.showと同じ出力が得られるので、実際に何もしませんでした – user1870400

+0

df2.rddによって返されたRDDのタイプは何ですか? – ImDarrenG

関連する問題