2017-01-09 4 views
1

*こんにちはすべて、データフレームに余分な列としてタイムスタンプを追加するにはどうすればいいですか?

私はあなたにとって簡単な質問があります。 私は、createStreamメソッドを使ってkafkaストリーミングから作成したRDDを持っています。 これをデータフレームに変換する前に、このrddの値としてタイムスタンプを追加します。 私は(withColumnを使用してデータフレームに値を追加するためにやってみました)が、ヴァル・D = dataframe.withColumn( "timeStamp_column"、dataframe.col( "今")*

val topicMaps = Map("topic" -> 1) 
    val now = java.util.Calendar.getInstance().getTime() 

    val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER) 

     messages.foreachRDD(rdd => 
      { 

      val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
      import sqlContext.implicits._ 

      val dataframe = sqlContext.read.json(rdd.map(_._2)) 



     val d =dataframe.withColumn("timeStamp_column",dataframe.col("now")) 

このエラーを返してきました) org.apache.spark.sql.AnalysisException:(action、device_os_ver、device_type、event_name、 item_name、lat、lon、memberid、productUpccd、tenantid)の中でカラム名 "now"を解決できません。 org.apache.spark.sql.DataFrame $$ anonfunの$の解決の$ 1.apply(DataFrame.scalaで :15

私は、彼らは不変でデータフレームを変更することはできませんが、RDDSは不変であることを知るようになったとして同様。 はそれを行うための最善の方法は何である。 RDDの値に(動的RDDにタイムスタンプを追加する)。

+0

データフレームcを定義しましたか?スキーマを追加できますか? –

+0

申し訳ありませんが、cはデータフレームです。私はそれを修正させてください。私は急いでいた。 –

答えて

0

の場合は、タイムスタンプのような定数で新しい列を追加する方法、あなたは​​3210を使用することができます関数:

import org.apache.spark.sql.functions._ 
val newDF = oldDF.withColumn("timeStamp_column", lit(System.currentTimeMillis)) 
+0

こんにちはJavierさん、こんにちは、このコードは私に頭痛を与えています、時々それはスキーマを印刷しています、そして時々それはこれを投げています java.lang.IllegalArgumentException:scala.Predef $ .require(Predef.scala:221) org.apache.spark.sql.catalyst.analysis.UnresolvedStar.expand(unresolved.scala:199)(org.apache.spark.sql.catalyst.analysis.Analyzer)$ ResolveReferences $$ anonfun $適用$ 10 $$ anonfu –

+0

ハイジャックこれは奇妙です、あなたはスパークのどのバージョンを使用していますか?これはspark-shell 1.6.3でテストされた例です。 'import org.apache.spark.sql.functions._' ' val oldDF = sc.parallelize(Seq((1,1)、(2,1 )、(3,1)、(4,1)、(5,1)))。toDF' 'oldDF.show' ' val newDF = oldDF.withColumn( "timeStamp_column"、lit(System.currentTimeMillis))) ' ' newDF.show' 私はまた、Spark 2.1を使用しています。 –

+0

はいそれはスパークシェルでうまく動作しますが、sparkストリーミングアプリケーションでこのコードを使用している間に上記のエラーをスローします。私はスパーク1.6.1をJava 1.7で使用しています –

関連する問題