2
私はKafkaからDStreamを経由して到着するデータを持っています。私はいくつかのキーワードを得るために特徴抽出を実行したい。Apache SparkでDStreamでフィーチャ抽出を使用する方法
すべてのデータが到着するのを待ってはいけません(終了する可能性のない連続ストリームを意図しているため)。苦しみます。
は、これまでのところ、私は一緒にそのような何かに置く:
def extractKeywords(stream: DStream[Data]): Unit = {
val spark: SparkSession = SparkSession.builder.getOrCreate
val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData
val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _
val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData
streamWithFeatures.print()
}
def extractFeatures(spark: SparkSession)
(rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = {
val df = spark.createDataFrame(rdd).toDF("data", "words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures)
val rawFeatures = hashingTF.transform(df)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(rawFeatures)
val rescaledData = idfModel.transform(rawFeature)
import spark.implicits._
rescaledData.select("data", "features").as[(Data, Array[String])].rdd
}
をしかし、私はjava.lang.IllegalStateException: Haven't seen any document yet.
を受けた - 私はちょうど一緒に物事をスクラップして試してみると驚いていない、と私は待っているわけではないので、私はそれを理解しますいくつかのデータの到着、私はデータ上でそれを使用しようとすると、生成されたモデルは空であるかもしれません。
この問題にはどのようなアプローチが適していますか?
私はコメントから助言及び2つの実行に手続きを分割使用