Spark-Shell内でSpark/Spark SQlを使用してJSONブロブをデータフレームにフラット化します。Spark/Spark SQLを使用してJSONブロブをデータフレームにフラット化するには
私が行うときdf2.show // 3行
body
------------------------------------
{"k1": "v1", "k2": "v2" }
{"k3": "v3"}
{"k4": "v4", "k5": "v5", "k6": "v6"}
-------------------------------------
は今、私はこれらの行/レコードの十億を持っていますが、最大で5つの異なるJSONスキーマがあるだろうと言う示し
val df = spark.sql("select body from test limit 3"); // body is a json encoded blob column
val df2 = df.select(df("body").cast(StringType).as("body"))
すべての10億行今私はどのように私は以下の形式でデータフレームを取得するように平坦化するのですか? df.forEachまたはdf.forEachPartitionまたはdf.explodeまたはdf.flatMapを使用する必要がありますか? 10億のデータフレームを作成せず、それらのすべてまたは何かを非効率的に結合しようとしていることを確認するにはどうすればよいですか。私がコードサンプルを見ることができれば素晴らしいだろう。また、これはNilを持っている可能性があるので、彼らはスペースを取るかどうか疑問に思いますか?
import org.apache.spark.sql._
val rdd = df2.rdd.map { case Row(j: String) => j }
spark.read.json(rdd).show()
スパークSQL:あなたのような何かをするのであれば
:あなたがオンになっているが、この例を見てスパークのバージョン
"K1" | "K2" | "K3" | "K4" | "K5" | "K6"
---------------------------------------
"V1" | "V2" |
| "V3" |
| "V4" | "V5" | "V6"
これはうまくいきませんでした。私はSpark 2.0.2を使用しています。私は次のエラーが表示されます。エラー:オーバーロードされたメソッド値json代替: (jsonRDD:org.apache.spark.rdd.RDD [String])org.apache.spark.sql.DataFrame –
user1870400
これに変更するとspark.read.json(df2.toJSON.rdd).show()df2.showと同じ出力が得られるので、実際に何もしませんでした – user1870400
df2.rddによって返されたRDDのタイプは何ですか? – ImDarrenG