2016-07-13 6 views
4

RDDを変換するときにprotobufフィールドのシリアル化に関するスパークジョブを実行中にエラーが発生しました。ProtoBufフィールドのSpark、Kryoシリアル化の問題

com.esotericsoftware.kryo.KryoException:java.lang.UnsupportedOperationExceptionが シリアルトレース: otherAuthors_(com.thomsonreuters.kraken.medusa.dbor.proto.Book $ DBBooks)

エラーが作成されているように見えますこの時点で:

val booksPerTier: Iterable[(TimeTier, RDD[DBBooks])] = allTiers.map { 
     tier => (tier, books.filter(b => isInTier(endOfInterval, tier, b) &&  !isBookPublished(o)).mapPartitions(it => 
     it.map{ord => 
     (ord.getAuthor, ord.getPublisherName, getGenre(ord.getSourceCountry))})) 
} 

val averagesPerAuthor = booksPerTier.flatMap { case (tier, opt) => 
    opt.map(o => (tier, o._1, PublisherCompanyComparison, o._3)).countByValue() 
} 

val averagesPerPublisher = booksPerTier.flatMap { case (tier, opt) => 
    opt.map(o => (tier, o._1, PublisherComparison(o._2), o._3)).countByValue() 
} 

フィールドは以下のようにいるProtobufで指定されたリストです:

otherAuthors_ = java.util.Collections.emptyList() 

コードが実際にはBook Protobufのそのフィールドを利用しているわけではありませんが、まだネットワーク経由で送信されています。

誰にもこれに関するアドバイスがありますか?

答えて

0

OK、古い質問ですが、ここに次世代の答えです。デフォルトのkryoシリアライザは、一部のコレクションではうまく動作しません。それに役立つサードパーティのライブラリがあります:あなたはおそらくスパーク設定を作成するときにカスタムkryoレジを提供する必要があり、あなたの場合はkryo-serializers

あなたのレジで必要なカスタム登録して
val conf = new SparkConf() 
conf.set("spark.kryo.registrator", "MyKryoRegistrator") 

class MyKryoRegistrator extends KryoRegistrator { 
    override def registerClasses(kryo: Kryo) { 
     kryo.register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer()); 
     // Probably should use proto serializer for your proto classes 
     kryo.register(Book.class, new ProtobufSerializer()); 
    } 
} 
関連する問題