私はいくつかの異常な問題があります。 kafkaから受け取ったrddを処理しようとすると、sparkContextにアクセスしようとすると例外が発生します(java.lang.NullPointerException)。それはそうだ、なぜ私は最初のRDDを処理するときにスパークストリーミングでSparkContextが失われる
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rdd.foreach(a => rddProcessor.processingRDD(a, sqlContext))
}
しかし、問題は
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rddProcessor.processingRDD(rdd.first(), sqlContext)
}
を発生しない、私は本当に知らない:RDDProcessorは問題がこれを起動する
def convertArrayToDataFrame(byteArray: Array[Byte], sqlContext: SQLContext) = {
val stringFromByte = b2s(byteArray)
val rdd = sqlContext.sparkContext.parallelize(stringFromByte.split("\n"))
val rows = rdd.map(_.split(";")).map(attributes => Row.fromSeq(attributes.toSeq))
val dateframe = sqlContext.createDataFrame(rows,RDDComponents.schema)
dateframe
}
直列化可能です問題がある。誰かがヒントを持っている場合は、私は私がStreamingContext
val sparkConf = new SparkConf().setAppName("KafkaConsumer")
val ssc = new StreamingContext(sparkConf, Milliseconds(batchDuration))
sscを定義するコードを提供できますか:SparkContext? – ponkin
OK、私はこれを投稿しました – kasiula03