2016-12-12 10 views
2

私はスパークストリーミングでJSONArrayというデータをストリーミングしようとしています。各JSONArrayにはいくつかのJSONObjectが含まれています。JSONArrayから各JSONオブジェクトを抽出し、スパークストリーミングでcassandraに保存する方法

それぞれのJSONObjectをデータフレームに保存し、もう一方のテーブルとマッピングした後でcassandraテーブルに保存したいとします。

私はJSONObjectを保存するためにdataframeを作成しようとしましたが、stream.foreachRDDでdataframeを作成すると、NullPointerExceptionがスローされます。スパークはネスト化されたRDDをサポートしていないためですか?もしそうなら、どうすればJSONObjectをcassandraに保存できますか?

データフォーマットは以下の通りです:

[ 
    { 
     "temperature":"21.8", 
     "humidity":"65.6", 
     "creatime":"2016-11-14 13:50:24", 
     "id":"3303136", 
     "msgtype":"th", 
     "sensorID":"001" 
    }, 
    { 
     "temperature":"23.1", 
     "humidity":"60.6", 
     "creatime":"2016-11-14 13:50:24", 
     "id":"3303137", 
     "msgtype":"th", 
     "sensorID":"002" 
    } 
] 

マイコード:

import kafka.serializer.StringDecoder 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka._ 
import com.datastax.spark.connector.mapper.DefaultColumnMapper 
import com.datastax.spark.connector._ 

import org.apache.spark.SparkConf 
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord } 
import org.apache.spark.sql._ 
import org.apache.spark.sql.cassandra._ 
import net.sf.json.JSONObject 
import net.sf.json.JSONArray 

object getkafkadata { 

    def main(args: Array[String]) { 

    val cassandraHostIP = "10.2.1.67" 
    val keyspaceToGet = "iot_test" 

    val conf = new SparkConf() 
     .setMaster("local") 
     .setAppName("PageViewStream") 
     .set("spark.driver.allowMultipleContexts", "true") 
     .set("spark.cassandra.connection.host", cassandraHostIP) 
    val sc = new SparkContext(conf) 
    val ssc = new StreamingContext(sc, Seconds(5)) 
    val sqc = new SQLContext(sc) 

    val sqlContext = SQLContextSingleton.getInstance(sc) 
    import sqlContext.implicits._ 

    val cc = new CassandraSQLContext(sc) 
    cc.setKeyspace(keyspaceToGet) 

    val kafkaParams = Map[String, String](
     "metadata.broker.list" -> "10.2.1.67:6667", 
     "group.id" -> "a13", 
     "auto.offset.reset" -> "smallest") 

    val topics = Set("test1208") 
    println("kafkaParams=", kafkaParams, "topics=", topics) 

    val offsetsList = 0 
    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 

    println("Line3 good!") 

    println("Start to parse json...") 

    val datas = stream.foreachRDD(rdd => { 
     rdd.foreachPartition(partitionOfRecords => { 
     partitionOfRecords.foreach(line => { 
      val event = JSONArray.fromObject(line._2) 
      for (n <- 0 to event.size() - 1) { 
      val eventobj = event.getJSONObject(n) 

      println("======= Message =======") 
      println(eventobj.toString()) 

      //data lost exception handling 
      var sensorID = "no_data" 
      var humidity = "0" 
      var temperature = "0" 
      var msgtype = "no_data" 
      var creatime = "0" 
      var id = "no_data" 

      if (eventobj.has("sensorID")) 
       sensorID = eventobj.getString("sensorID") 
      if (eventobj.has("humidity")) 
       humidity = eventobj.getString("humidity") 
      if (eventobj.has("temperature")) 
       temperature = eventobj.getString("temperature") 
      if (eventobj.has("msgtype")) 
       msgtype = eventobj.getString("msgtype") 
      if (eventobj.has("creatime")) 
       creatime = eventobj.getString("creatime") 
      if (eventobj.has("id")) 
       id = eventobj.getString("id") 

      var df = cc.createDataFrame(Seq(
       (sensorID, humidity, temperature, msgtype, creatime, id))) 
       .toDF("sensorID", "humidity", "temperature", "msgtype", "creatime", "id") 

      println("==========df create done=========") 
      df.show() 

      } 
     }) 
     }) 
    }) 
ssc.start() 
ssc.awaitTermination() 

} 

例外メッセージ:

16/12/12 09:28:35 ERROR JobScheduler: Error running job streaming job 1481506110000 ms.0 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException 
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638) 
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573) 
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132) 
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) 
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:75) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:74) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.lang.NullPointerException 
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638) 
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573) 
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132) 
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) 
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    ... 3 more 
16/12/12 09:28:35 INFO DAGScheduler: ResultStage 1 (foreachPartition at getkafkadata.scala:75) finished in 0.063 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException 
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638) 
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573) 
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132) 
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) 
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:75) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:74) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.lang.NullPointerException 
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638) 
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573) 
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132) 
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) 
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76) 
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    ... 3 more 
16/12/12 09:28:35 INFO DAGScheduler: Job 1 finished: foreachPartition at getkafkadata.scala:75, took 0.098511 s 

答えて

1

それはRDDクロージャ内のデータフレームを作成することはできません。データフレーム操作は、エグゼキュータレベルでは意味をなさない。

代わりに、RDDのデータを目的のフォーマットに変換し、ドライバレベルでデータフレーム操作を行います。

例:構造的変化を示す部分コード。 rddデータが最初に変換されてから、ドライバのデータフレームに変換される方法に注意してください。

val datas = stream.foreachRDD{rdd => 
     val parsedData = rdd.flatMap{record => 
      val events = JSONArray.fromObject(record._2) 
      events.map(json => // parse + transform each entry into Record) 
      } 
     val df = cc.createDataFrame(parsedData) 
     // write to Cassandra 
     df.write 
     .format("org.apache.spark.sql.cassandra") 
     .options(Map("table" -> "sensordata", "keyspace" -> "iot")) 
     .save() 
} 
+0

ありがとうございました!あなたの例は本当に役に立ちます。 – gogocatmario

関連する問題