2016-08-05 5 views
0

私はいくつかの異常な問題があります。 kafkaから受け取ったrddを処理しようとすると、sparkContextにアクセスしようとすると例外が発生します(java.lang.NullPointerException)。それはそうだ、なぜ私は最初のRDDを処理するときにスパークストリーミングでSparkContextが失われる

val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER) 
val sqlContext = new SQLContext(ssc.sparkContext) 
receiver.foreachRDD { rdd => 
    log.info("Received RDD attempt") 
    if (!rdd.isEmpty()) { 
     rdd.foreach(a => rddProcessor.processingRDD(a, sqlContext)) 
    } 

しかし、問題は

val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER) 
val sqlContext = new SQLContext(ssc.sparkContext) 
receiver.foreachRDD { rdd => 
    log.info("Received RDD attempt") 
    if (!rdd.isEmpty()) { 
    rddProcessor.processingRDD(rdd.first(), sqlContext) 
    } 

を発生しない、私は本当に知らない:RDDProcessorは問題がこれを起動する

def convertArrayToDataFrame(byteArray: Array[Byte], sqlContext: SQLContext) = { 
val stringFromByte = b2s(byteArray) 
val rdd = sqlContext.sparkContext.parallelize(stringFromByte.split("\n")) 
val rows = rdd.map(_.split(";")).map(attributes => Row.fromSeq(attributes.toSeq)) 
val dateframe = sqlContext.createDataFrame(rows,RDDComponents.schema) 
dateframe 
} 

直列化可能です問題がある。誰かがヒントを持っている場合は、私は私がStreamingContext

val sparkConf = new SparkConf().setAppName("KafkaConsumer") 
val ssc = new StreamingContext(sparkConf, Milliseconds(batchDuration)) 
+0

sscを定義するコードを提供できますか:SparkContext? – ponkin

+0

OK、私はこれを投稿しました – kasiula03

答えて

0

を定義

@EDIT 感謝するだろうまあ、SparkContextはシリアライズないが、それはそれは@transientとしてマークされていますSparkSession、経由SqlContextで利用可能です。だから、foreachmapの引数のように、シリアル化する必要のあるラムダでは使用することはできません(foreachRDDのものではありませんが、SparkContextを使用しないようにprocessingRDDを書き込むことはできません)。 )。

+0

私は理解しますが、それはなぜ1つのrddのために働くのですか?オブジェクトがシリアル化されるとき、すべての関数もシリアル化されますか? – kasiula03

+0

'foreachRDD'は引数を他のノードに送信しません。ドライバノード上では完全に実行されます(' rdd.first() 'は他のノードからドライバノードに要素を送ります)。 'rdd.foreach'はそこでそれを実行するために引数を各ノードに送る必要があります。 –

+0

この問題をスキップする方法spddContextを使ってこのrddを処理する必要があるときは? rdd.colect()は動作しますが、正しくはありません – kasiula03

関連する問題