2016-10-04 9 views
0

hereのように、MongoからSparkへのダンプをbsonにロードしています。それは動作しますが、私が取得することである:BSONObjectからDataFrameへのRDD

org.apache.spark.rdd.RDD[(Object, org.bson.BSONObject)]

それは基本的にすべてのStringフィールドでちょうどJSONでなければなりません。残りのコードでは、データを操作するためにDataFrameオブジェクトが必要です。しかし、もちろん、toDFはそのRDDで失敗します。すべてのフィールドをStringとしてSpark DataFrameに変換するにはどうすればよいですか? spark.read.jsonと似たようなものがあれば素晴らしいでしょう。

答えて

0
val datapath = "path_to_bson_file.bson" 

import org.apache.hadoop.conf.Configuration 

// Set up the configuration for reading from bson dump. 
val bsonConfig = new Configuration() 
bsonConfig.set("mongo.job.input.format", "com.mongodb.hadoop.BSONFileInputFormat") 

// given with your spark session 
implicit lazy val sparkSession = initSpark() 

// read the RDD[org.bson.BSONObject] 
val bson_data_as_json_string = sparkSession.sparkContext.newAPIHadoopFile(datapath, 
    classOf[com.mongodb.hadoop.BSONFileInputFormat]. 
    asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[Object, org.bson.BSONObject]]), 
    classOf[Object], 
    classOf[org.bson.BSONObject], 
    bsonConfig). 
    map{row => { 
    // map BSON object to JSON string 
    val json = com.mongodb.util.JSON.serialize(row._2) 
    json 
    } 
} 

// read into JSON spark Dataset: 
val bson_data_as_json_dataset = sparkSession.sqlContext.read.json(bson_data_as_json_string) 
// eval the schema: 
bson_data_as_json_dataset.printSchema() 
関連する問題