2015-01-13 6 views
5

このjsonファイルをハイブテーブルに読み込もうとしていますが、ここではトップレベルキー1,2 ...は一貫していません。jsonキー値とhive/sqlとsparkを読み取る

{ 
    "1":"{\"time\":1421169633384,\"reading1\":130.875969,\"reading2\":227.138275}", 
    "2":"{\"time\":1421169646476,\"reading1\":131.240628,\"reading2\":226.810211}", 
    "position": 0 
} 

列が位置を無視するように私は私のハイブテーブルに時間と測定値1,2を必要としています。 ハイブクエリとスパークマップコンビネーションコードを組み合わせることもできます。 ありがとうございました。

Exception in thread "main" org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: SELECT json_val from temp_hum_table lateral view explode_map(json_map(*, 'int,string')) x as json_key, json_val 
    at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:239) 
    at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) 
    at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) 
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) 
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
+0

出力がどのように見えるかの例は非常に役立ちます。 – gobrewers14

+0

出力テーブルの例: '' time "、" reading1 "、" reading2 "\ n 1421169633384,130.875969,227.138275 \ n 1421169646476,131.240628,226.810211 – venuktan

答えて

4

これは働くだろう、あなたは "1" と "2"(キー名)の名前を変更する場合:ここ

アップデートは、それは次のようなエラーがスローされます私は

val hqlContext = new HiveContext(sc) 

val rdd = sc.textFile(data_loc) 

val json_rdd = hqlContext.jsonRDD(rdd) 
json_rdd.registerTempTable("table123") 
println(json_rdd.printSchema()) 
hqlContext.sql("SELECT json_val from table123 lateral view explode_map(json_map(*, 'int,string')) x as json_key, json_val ").foreach(println) 

をしようとしていますです

val resultrdd = sqlContext.sql("SELECT x1.time, x1.reading1, x1.reading1, x2.time, x2.reading1, x2.reading2 from table123 ") 
resultrdd.flatMap(row => (Array((row(0),row(1),row(2)), (row(3),row(4),row(5))))) 

これはあなたの時間を持つタプルのRDDを与えるだろう、reading1と:「X1」と(JSONファイル内またはRDDで)「X2」へreading2。あなたがSchemaRDDが必要な場合は、このように、flatMap変換内側ケースクラスにマッピングします:

case class Record(time: Long, reading1: Double, reading2: Double) 
resultrdd.flatMap(row => (Array(Record(row.getLong(0),row.getDouble(1),row.getDouble(2)), 
     Record(row.getLong(3),row.getDouble(4),row.getDouble(5)) ))) 
val schrdd = sqlContext.createSchemaRDD(resultrdd) 

更新:多くのネストされたキーの場合は

、あなたが行を解析することができます

val allrdd = sqlContext.sql("SELECT * from table123") 
allrdd.flatMap(row=>{ 
    var recs = Array[Record](); 
    for(col <- (0 to row.length-1)) { 
     row(col) match { 
      case r:Row => recs = recs :+ Record(r.getLong(2),r.getDouble(0),r.getDouble(1)); 
      case _ => ; 
     } 
    }; 
    recs 
}) 
+0

キーは1,2 ... 240になります。したがって、x1.timeなどを実行すると機能しないことがあります。 – venuktan

+0

私の回答が – pzecevic

+0

に更新されました。ここで私は 'allrdd.registerTempTable(" vals ");というエラーをスローしました。 sqlContext.sql( "vals LIMIT 10からreading1を選択").collect.foreach(println) '私は何か不足していますか? – venuktan

関連する問題