2016-10-07 4 views
1

で例外を発生させ、私は火花とScalaの両方に非常に新しいです、とに似たcsvファイルをロードしようとしています:スパーク2時間タイプテイク(5)

A,09:33:57.570 
B,09:43:02.577 
... 

私は見るだけ時間的タイプscala.sql.typesでTimestampTypeですので、私はcsvファイルをロードしています:、

val schema = StructType(Array(StructField("A", StringType, true), StructField("time", TimestampType, true))) 

val table = spark.read.option("header","false").option("inferSchema","false").schema(schema).csv("../table.csv") 

これは私がなどtable.show()またはtable.take(5)を行うまでは正常に動作するようです。その場合には、私は次の例外を取得:

scala> table.show() 
16/10/07 16:32:25 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) 
java.lang.IllegalArgumentException 
     at java.sql.Date.valueOf(Date.java:143) 
     at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137) 
     at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:287) 
     at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:115) 
     at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:84) 
     at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:125) 
     at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$1.apply(CSVFileFormat.scala:124) 
     at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 

スパーク内に時間データを保存する方法がありますか?私もそれを文字列として残して、各値のjava.timeからLocalTime.parse()をマッピングしようとしましたが、型のエンコーダーが存在しないと言っても失敗します。

答えて

0

時刻データを直接入力できるSQL型はありません。おそらく、LongTypeunix_timestampと解析するとよいでしょう。それはに類似したデータフレームとなるはずである

StructField("time", StringType, true))) 

でデータを読む:

val df = Seq(("A", "09:33:57.570"), ("B", "09:43:02.577")).toDF("A", "time") 

シンプルな日付フォーマット定義:

val format = "HH:mm:ss.SSS" 

と解析のためにそれを使用します。

df.withColumn("seconds", unix_timestamp($"time", format)) 

残念ながら、これはすべてですossy変換。あなたはミリ秒を保持したい場合は

+---+------------+-------+ 
| A|  time|seconds| 
+---+------------+-------+ 
| A|09:33:57.570| 30837| 
| B|09:43:02.577| 31382| 
+---+------------+-------+ 

ので、あなたがそうであるようにjava.time.LocalTimeを使用してtoNanoOfDayの結果を格納することができます。

val nanoOfDay = udf((s: String) => 
    java.time.LocalTime.parse(s).toNanoOfDay) 

df.withColumn("nanseconds", nanoOfDay($"time")) 
0

日付/時刻操作については、JodaTimeを参照することもできます。これをpom.xml(Maven用)に含めることができます

<dependency> 
     <groupId>joda-time</groupId> 
     <artifactId>joda-time</artifactId> 
     <version>2.9</version> 
    </dependency> 

    <dependency> 
     <groupId>org.joda</groupId> 
     <artifactId>joda-convert</artifactId> 
     <version>1.8.1</version> 
    </dependency> 
関連する問題