2016-12-11 13 views
2

これは、Confluentプラットフォームで直列化されたAvrosです。私はこのような実施例を見つけたいSpark Streamingを使用して、Kafkaからバイナリシリアル化されたAvro(Confluent Platform)を読むにはどうすればいいですか?

https://github.com/seanpquig/confluent-platform-spark-streaming/blob/master/src/main/scala/example/StreamingJob.scala

が、スパーク構造化されたストリーミング用

kafka 
    .select("value") 
    .map { row => 

    // this gives me test == testRehydrated  
    val test = Foo("bar") 
    val testBytes = AvroWriter[Foo].toBytes(test) 
    val testRehydrated = AvroReader[Foo].fromBytes(testBytes) 


    // this yields mangled Foo data 
    val bytes = row.getAs[Array[Byte]]("value") 
    val rehydrated = AvroReader[Foo].fromBytes(bytes) 
+0

解決策を見つけましたか? – aasthetic

+0

@aasthetic以下を参照 – zzztimbo

答えて

1

私はConfluentプラットフォームデコーダを使用する必要があると考えました。

def decoder: io.confluent.kafka.serializers.KafkaAvroDecoder = { 
    val props = new Properties() 
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) 
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") 
    val vProps = new kafka.utils.VerifiableProperties(props) 
    new io.confluent.kafka.serializers.KafkaAvroDecoder(vProps) 
} 
+0

ありがとう@zzztimbo、私はまた私のキーのスキーマを登録しました、彼らはロングタイプでなければなりません。私はSpark Streamingでデシリアライズできません。何か案が? – aasthetic

+1

kafkaトピック+スキーマレジストリを使用して、構造化ストリーミングを設定する方法の完全な例を得ることは役に立ちます。このデコーダで何をすべきかはわかりません。 –

関連する問題