2016-10-09 8 views
1

現在、SQLクエリの結果を含むデータフレームがあります。データフレームには、患者ID、日付、およびコードの列があります。RDDへのデータフレームデータの挿入

val res1 = sqlContext.sql("select encounter.Member_ID AS patientID, encounter.Encounter_DateTime AS date, diag.code from encounter join diag on encounter.Encounter_ID = diag.Encounter_ID") 

私は診断は、フォームのケースクラスである形式のRDD RDD [診断]には、このデータフレームを取り、それを配置しようとしています:これが可能である

case class Diagnostic(patientID:String, date: Date, code: String) 

?私の現在の試みは、scala.MatchErrorを以下の行から戻しています。

val diagnostic: RDD[Diagnostic] = res1.map { 
    case Row(patientID:String, date:java.util.Date, code:String) => Diagnostic(patientID=patientID, date=date, code=code) 
} 

スキーマ:

root 
|-- patientID: string (nullable = true) 
|-- date: string (nullable = true) 
|-- code: string (nullable = true) 

res1.as[Diagnostic]からエラーメッセージ:

Main.scala:170: overloaded method value as with alternatives: 
[error] (alias: Symbol)org.apache.spark.sql.DataFrame <and> 
[error] (alias: String)org.apache.spark.sql.DataFrame 
[error] does not take type parameters 
[error]  val testlol: RDD[Diagnostic] = res1.as[Diagnostic] 
[error]          ^
[error] one error found 
[error] (compile:compileIncremental) Compilation failed 
[error] Total time: 3 s, completed Oct 9, 2016 3:16:38 PM 

全体エラーメッセージ:

[Stage 4:=======================================>     (2 + 

1)/3]16/10/09 14:23:32 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 8) 
scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) 
     at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168) 
     at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
     at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
     at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177) 
     at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
     at org.apache.spark.scheduler.Task.run(Task.scala:64) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 
16/10/09 14:23:32 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 8, localhost): scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) 
     at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168) 
     at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
     at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
     at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177) 
     at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
     at org.apache.spark.scheduler.Task.run(Task.scala:64) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

16/10/09 14:23:32 ERROR TaskSetManager: Task 0 in stage 6.0 failed 1 times; aborting job 
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 8, localhost): scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) 
[error]   at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168) 
[error]   at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168) 
[error]   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
[error]   at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
[error]   at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
[error]   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
[error]   at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
[error]   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
[error]   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
[error]   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
[error]   at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
[error]   at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
[error]   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
[error]   at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
[error]   at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
[error]   at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177) 
[error]   at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177) 
[error]   at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) 
[error]   at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) 
[error]   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
[error]   at org.apache.spark.scheduler.Task.run(Task.scala:64) 
[error]   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
[error]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[error]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[error]   at java.lang.Thread.run(Thread.java:745) 
[error] 
[error] Driver stacktrace: 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 8, localhost): scala.MatchError: [000961291-01,2005-06-21T19:45:00Z,584.9] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) 
     at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168) 
     at edu.gatech.cse8803.main.Main$$anonfun$11.apply(Main.scala:168) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
     at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
     at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177) 
     at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
     at org.apache.spark.scheduler.Task.run(Task.scala:64) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
[trace] Stack trace suppressed: run last compile:run for the full output. 
16/10/09 14:23:32 ERROR ContextCleaner: Error in cleaning thread 
java.lang.InterruptedException 
     at java.lang.Object.wait(Native Method) 
     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:146) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:144) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:144) 
     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) 
     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:143) 
     at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) 
16/10/09 14:23:32 ERROR Utils: Uncaught exception in thread SparkListenerBus 
java.lang.InterruptedException 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:996) 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) 
     at java.util.concurrent.Semaphore.acquire(Semaphore.java:317) 
     at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:62) 
     at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) 
     at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) 
     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) 
     at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60) 
java.lang.RuntimeException: Nonzero exit code: 1 
     at scala.sys.package$.error(package.scala:27) 
[trace] Stack trace suppressed: run last compile:run for the full output. 
[error] (compile:run) Nonzero exit code: 1 
[error] Total time: 13 s, completed Oct 9, 2016 2:23:32 PM 
+0

あなたは、全体のエラーメッセージを表示することができますか?おそらくあなたが期待しているものと一致しない_actual_タイプが表示されるはずです。 –

+0

上記のエラーメッセージが追加されました。 – mongolol

答えて

2

java.util.DateはAに格納できるデータのタイプではありませんDataFrame。その外観からdateTimestampStringです。私は右ケースだ場合、クラスは以下のように定義する必要がありますと

case Row(patientID: String, date: java.util.Date, code: String) 

case class Diagnostic(patientID: String, date: java.sql.Timestamp, code: String) 

あなたはパターンを交換する必要があり

case Row(patientID: String, date: java.sql.Timestamp, code: String) 

timestampdateをキャスト:

res1.select($"patientID", $"date".cast("timestamp"), $"code") 

最後に、あなたが前方互換性のためにマッピングする前にrddメソッドを使用する必要があります。一般的には

res1.select($"patientID", $"date".cast("timestamp"), $"code").rdd.map { 
    ... 
} 

を私はas方法を使用することをお勧めします:

res1.as[Diagnostic].rdd 
関連する問題