2017-10-19 9 views
1

私はスパークするのが新です。私は構造化されたストリーミングを使用してカフカからデータを読み込みます。スパーク(2.2):構造化ストリーミングを使用してカフカからデリシアライズの倹約記録

私はScalaでこのコードを使用してデータを読み取ることができます。

val data = spark.readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", brokers) 
     .option("subscribe", topics) 
     .option("startingOffsets", startingOffsets) 
     .load() 

値列での私のデータは、スリフトレコードです。ストリーミングAPIはバイナリ形式のデータを提供します。データを文字列またはjsonにキャストする例がありますが、データをデシリアライズする方法の例はありません。

どうすればこの問題を解決できますか?

答えて

0

このブログはdatabricksのWebサイトで見つかりました。 Apache Kafkaの複雑なデータストリームを消費および変換するためにSpark SQLのAPIをどのように活用できるかを示しています。

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

UDFは、デシリアライザの行に使用することができる方法を説明するセクションがあります:

object MyDeserializerWrapper { 
    val deser = new MyDeserializer 
} 
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) => 
    MyDeserializerWrapper.deser.deserialize(topic, bytes) 
) 

df.selectExpr("""deserialize("topic1", value) AS message""") 

私はJavaを使用して、したがって、どのようにそれができる確認するには、次のサンプルUDFを書くためにしなければならなかったのですJavaで呼び出すこと:

UDF1<byte[], String> mode = new UDF1<byte[], String>() { 
      @Override 
      public String call(byte[] bytes) throws Exception { 
       String s = new String(bytes); 
       return "_" + s; 
      } 
     }; 

次のように今私は、例を数える構造化されたストリーミング・ワードでこのUDFを使用することができます。

Dataset<String> words = df 
       //converted the DataFrame to a Dataset of String using .as(Encoders.STRING()) 
//    .selectExpr("CAST(value AS STRING)") 
       .select(callUDF("mode", col("value"))) 
       .as(Encoders.STRING()) 
       .flatMap(
         new FlatMapFunction<String, String>() { 
          @Override 
          public Iterator<String> call(String x) { 
           return Arrays.asList(x.split(" ")).iterator(); 
          } 
         }, Encoders.STRING()); 

私のための次のステップは、除算デシリアライズ用のUDFを作成することです。私はすぐにそれを投稿します。

関連する問題