2016-12-06 3 views
2

私はKafkaからDStreamを経由して到着するデータを持っています。私はいくつかのキーワードを得るために特徴抽出を実行したい。Apache SparkでDStreamでフィーチャ抽出を使用する方法

すべてのデータが到着するのを待ってはいけません(終了する可能性のない連続ストリームを意図しているため)。苦しみます。

は、これまでのところ、私は一緒にそのような何かに置く:

def extractKeywords(stream: DStream[Data]): Unit = { 

    val spark: SparkSession = SparkSession.builder.getOrCreate 

    val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData 

    val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _ 

    val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData 

    streamWithFeatures.print() 
} 

def extractFeatures(spark: SparkSession) 
        (rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = { 

    val df = spark.createDataFrame(rdd).toDF("data", "words") 

    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures) 
    val rawFeatures = hashingTF.transform(df) 

    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") 
    val idfModel = idf.fit(rawFeatures) 

    val rescaledData = idfModel.transform(rawFeature) 

    import spark.implicits._ 
    rescaledData.select("data", "features").as[(Data, Array[String])].rdd 
} 

をしかし、私はjava.lang.IllegalStateException: Haven't seen any document yet.を受けた - 私はちょうど一緒に物事をスクラップして試してみると驚いていない、と私は待っているわけではないので、私はそれを理解しますいくつかのデータの到着、私はデータ上でそれを使用しようとすると、生成されたモデルは空であるかもしれません。

この問題にはどのようなアプローチが適していますか?

私はコメントから助言及び2つの実行に手続きを分割使用

答えて

0

:IDFモデルを計算し、ファイルからIDFモデルを読み込み

def trainFeatures(idfModelFile: File, rdd: RDD[(String, Seq[String])]) = { 
    val session: SparkSession = SparkSession.builder.getOrCreate 

    val wordsDf = session.createDataFrame(rdd).toDF("data", "words") 

    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures") 
    val featurizedDf = hashingTF.transform(wordsDf) 

    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") 
    val idfModel = idf.fit(featurizedDf) 

    idfModel.write.save(idfModelFile.getAbsolutePath) 
} 
  • 1をファイルに保存します

    • 1とすべての着信情報について実行するだけです。

      val idfModel = IDFModel.load(idfModelFile.getAbsolutePath) 
      
      val documentDf = spark.createDataFrame(rdd).toDF("update", "document") 
      
      val tokenizer = new Tokenizer().setInputCol("document").setOutputCol("words") 
      val wordsDf = tokenizer.transform(documentDf) 
      
      val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures") 
      val featurizedDf = hashingTF.transform(wordsDf) 
      
      val extractor = idfModel.setInputCol("rawFeatures").setOutputCol("features") 
      val featuresDf = extractor.transform(featurizedDf) 
      
      featuresDf.select("update", "features") 
      
  • 関連する問題