2016-07-19 9 views
0

問題:オブジェクトがシリアライズできないSpark Avroから寄木細工記者

問題を解決する方法をご覧ください。適切に印刷するように正しく読み取ることができます。しかしによって引き起こさ

直列化可能ではない

オブジェクト取得寄木細工したレコードを書き込み中:java.io.NotSerializableException: parquet.avro.AvroParquetWriterシリアル化スタック: - オブジェクトではありません 、シリアライズ(クラス:parquet.avroを.AvroParquetWriter、値: [email protected]

を見直し、私はそれを行うための最善の方法であるかを教えてください。

コード:あなたはあなたがアプローチを取っている理由Covertingアブロ・レコードは、私はよく分からない

val records = sc.newAPIHadoopRDD(conf.getConfiguration, 
    classOf[AvroKeyInputFormat[GenericRecord]], 
    classOf[AvroKey[GenericRecord]], //Transforms the PairRDD to RDD 
    classOf[NullWritable]).map(x => x._1.datum) 

    // Build a schema 
    val schema = SchemaBuilder 
    .record("x").namespace("x") 
    .fields 
    .name("x").`type`().stringType().noDefault() 
    .endRecord 

val parquetWriter = new AvroParquetWriter[GenericRecord](new Path(outPath), schema) 

val parquet = new GenericRecordBuilder(schema) 

records.foreach { keyVal => 
    val x = keyVal._1.datum().get("xyz") -- Field 
    parquet.set("x", x) 
     .build 
     parquetWriter.write(schema.build()) 
    } 

答えて

0

寄木細工します。しかし、私は別のアプローチをお勧めします。 avroファイルをrddに入れたら、それはあなたのように見えます。また、スキーマを作成してRDDをデータフレームに変換し、データフレームを寄木張りで書き出すことができます。

var avroDF = sqlContext.createDataFrame(avroRDD,avroSchema) 
avroDF 
    .write 
    .mode(SaveMode.Overwrite) 
    .parquet("parquet directory to write file") 
+0

感謝を読むためにここに開始することができます。しかし、これは配列、リスト、マップの入れ子構造です。非常に大きなネストされたavro。すべての要素をループして必要なものを得る必要があります。 – Ankur

+0

あなたがupvotedし、これらの回答の1つを受け入れるといいです。あなたが聞いたすべての質問に答えました。 @Ankur – mark

0

複雑な構造と配列を持つ複雑なJsonの中には、ハイブql側面図が爆発するものがあります。次に、平坦化された複雑なjsonの例を示します。それは10行から始まり、いくつかのトレースでは60行を得ることができ、いくつかは5未満になります。それは単にどのように爆発するかによって異なります。

val tenj = sqlContext.read.json("file:///home/marksmith/hive/Tenfile.json") 

scala> tenj.printSchema 
root 

|-- DDIVersion: string (nullable = true) 
|-- EndTimestamp: string (nullable = true) 
|-- Stalls: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- Stall: long (nullable = true) 
| | |-- StallType: string (nullable = true) 
| | |-- TraceTypes: struct (nullable = true) 
| | | |-- ActiveTicket: struct (nullable = true) 
| | | | |-- Category: string (nullable = true) 
| | | | |-- Traces: array (nullable = true) 
| | | | | |-- element: struct (containsNull = true) 
| | | | | | |-- EndTime: string (nullable = true) 
| | | | | | |-- ID: string (nullable = true) 
| | | | | | |-- Source: string (nullable = true) 
| | | | | | |-- StartPayload: struct (nullable = true) 
| | | | | | | |-- SubticketID: string (nullable = true) 
| | | | | | | |-- TicketID: string (nullable = true) 
| | | | | | | |-- TicketState: long (nullable = true) 
| | | | | | |-- StartTime: string (nullable = true) 

tenj.registerTempTable("ddis") 


val sat = sqlContext.sql(
    "select DDIVersion, StallsExp.stall, StallsExp.StallType, at.EndTime, at.ID, 
     at.Source, at.StartPayload.SubTicketId, at.StartPayload.TicketID, 
     at.StartPayload.TicketState, at.StartTime 
    from ddis 
     lateral view explode(Stalls) st as StallsExp 
     lateral view explode(StallsExp.TraceTypes.ActiveTicket.Traces) at1 as at") 
sat: org.apache.spark.sql.DataFrame = [DDIVersion: string, stall: bigint, StallType: string, EndTime: string, ID: string, Source: string, SubTicketId: string, TicketID: string, TicketState: bigint, StartTime: string] 

sat.count 
res22: Long = 10 

sat.show 
+----------+-----+---------+--------------------+---+------+-----------+--------+-----------+--------------------+ 
|DDIVersion|stall|StallType|    EndTime| ID|Source|SubTicketId|TicketID|TicketState|   StartTime| 
+----------+-----+---------+--------------------+---+------+-----------+--------+-----------+--------------------+ 
| 5.3.1.11| 15| POPS4|2016-06-08T20:07:...| | STALL|   0|  777|   1|2016-06-08T20:07:...| 
| 5.3.1.11| 14| POPS4|2016-06-08T20:07:...| | STALL|   0|  384|   1|2016-06-08T20:06:...| 
| 5.3.1.11| 13| POPS4|2016-06-08T20:07:...| | STALL|   0| 135792|   1|2016-06-08T20:06:...| 
| 5.0.0.28| 26| POPS4|2016-06-08T20:06:...| | STALL|   0|  774|   2|2016-06-08T20:03:...| 
+0

ありがとうございました。ネストされたavroを読み込み、いくつかの特定のカラムを取得し、それをParquetフォーマットにダンプする方法を提供できますか? – Ankur

1

あなたは、データフレームにアブロにアプローチの https://github.com/databricks/spark-avro

// import needed for the .avro method to be added 
import com.databricks.spark.avro._ 

val sqlContext = new SQLContext(sc) 

// The Avro records get converted to Spark typesca 
val df = sqlContext.read.avro("src/test/resources/episodes.avro") 

df.registerTempTable("tempTable") 
val sat = sqlContext.sql(//use lateral view explode) 
sat.write.parquet("/tmp/output") 
+0

'//側面ビューを使用する目的は何ですか?なぜこれが必要ですか? –

+0

あなたは3つのものの配列である列を持っています。側面図を使用することで、その行を平坦化できますが、3行になります。すべての列は、配列である列を除いて同じになります。これには3つの異なる値があります。 – mark

+0

「爆発する」だけでは(横から見ない)?私は単独で「爆発」でそれをやり遂げることができ、なぜ横から見たのかを疑問に思っていました。 'Dataset.flatMap'も使用できます。 –

関連する問題