*こんにちはすべて、データフレームに余分な列としてタイムスタンプを追加するにはどうすればいいですか?
私はあなたにとって簡単な質問があります。 私は、createStreamメソッドを使ってkafkaストリーミングから作成したRDDを持っています。 これをデータフレームに変換する前に、このrddの値としてタイムスタンプを追加します。 私は(withColumnを使用してデータフレームに値を追加するためにやってみました)が、ヴァル・D = dataframe.withColumn( "timeStamp_column"、dataframe.col( "今")*
val topicMaps = Map("topic" -> 1)
val now = java.util.Calendar.getInstance().getTime()
val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)
messages.foreachRDD(rdd =>
{
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val dataframe = sqlContext.read.json(rdd.map(_._2))
val d =dataframe.withColumn("timeStamp_column",dataframe.col("now"))
このエラーを返してきました) org.apache.spark.sql.AnalysisException:(action、device_os_ver、device_type、event_name、 item_name、lat、lon、memberid、productUpccd、tenantid)の中でカラム名 "now"を解決できません。 org.apache.spark.sql.DataFrame $$ anonfunの$の解決の$ 1.apply(DataFrame.scalaで :15
私は、彼らは不変でデータフレームを変更することはできませんが、RDDSは不変であることを知るようになったとして同様。 はそれを行うための最善の方法は何である。 RDDの値に(動的RDDにタイムスタンプを追加する)。
データフレームcを定義しましたか?スキーマを追加できますか? –
申し訳ありませんが、cはデータフレームです。私はそれを修正させてください。私は急いでいた。 –