2016-10-03 9 views
2

プレLDA変換をデータに変換する際にコンパイルエラーが発生しました。 SPARK 2.0でSCALAを使用しています。エラーを投げている特定のコードは以下のとおりである:value toDFはorg.apache.spark.rdd.RDDのメンバーではありません。[(Long、org.apache.spark.ml.linalg.Vector)]

val documents = PreLDAmodel.transform(mp_listing_lda_df) 
    .select("docId","features") 
    .rdd 
    .map{ case Row(row_num: Long, features: MLVector) => (row_num, features) } 
    .toDF() 

完全なコンパイルエラーがある:それことを考え

import java.io.FileInputStream 
import java.sql.{DriverManager, ResultSet} 
import java.util.Properties 

import org.apache.spark.SparkConf 
import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.clustering.LDA 
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, RegexTokenizer, StopWordsRemover} 
import org.apache.spark.ml.linalg.{Vector => MLVector} 
import org.apache.spark.mllib.clustering.{LDA => oldLDA} 
import org.apache.spark.rdd.JdbcRDD 
import org.apache.spark.sql.types.{StringType, StructField, StructType} 
import org.apache.spark.sql.{Row, SparkSession} 

object MPClassificationLDA { 
    /*Start: Configuration variable initialization*/ 
    val props = new Properties 
    val fileStream = new FileInputStream("U:\\JIRA\\MP_Classification\\target\\classes\\mpclassification.properties") 
    props.load(fileStream) 
    val mpExtract = props.getProperty("mpExtract").toString 
    val shard6_db_server_name = props.getProperty("shard6_db_server_name").toString 
    val shard6_db_user_id = props.getProperty("shard6_db_user_id").toString 
    val shard6_db_user_pwd = props.getProperty("shard6_db_user_pwd").toString 
    val mp_output_file = props.getProperty("mp_output_file").toString 
    val spark_warehouse_path = props.getProperty("spark_warehouse_path").toString 
    val rf_model_file_path = props.getProperty("rf_model_file_path").toString 
    val windows_hadoop_home = props.getProperty("windows_hadoop_home").toString 
    val lda_vocabulary_size = props.getProperty("lda_vocabulary_size").toInt 
    val pre_lda_model_file_path = props.getProperty("pre_lda_model_file_path").toString 
    val lda_model_file_path = props.getProperty("lda_model_file_path").toString 
    fileStream.close() 
    /*End: Configuration variable initialization*/ 

    val conf = new SparkConf().set("spark.sql.warehouse.dir", spark_warehouse_path) 

    def main(arg: Array[String]): Unit = { 
    //SQL Query definition and parameter values as parameter upon executing the Object 
    val cont_id = "14211599" 
    val top = "100000" 
    val start_date = "2016-05-01" 
    val end_date = "2016-06-01" 

    val mp_spark = SparkSession 
     .builder() 
     .master("local[*]") 
     .appName("MPClassificationLoadLDA") 
     .config(conf) 
     .getOrCreate() 
    MPClassificationLDACalculation(mp_spark, cont_id, top, start_date, end_date) 
    mp_spark.stop() 
    } 

    private def MPClassificationLDACalculation 
    (mp_spark: SparkSession 
    ,cont_id: String 
    ,top: String 
    ,start_date: String 
    ,end_date: String 
): Unit = { 

    //DB connection definition 
    def createConnection() = { 
     Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver").newInstance(); 
     DriverManager.getConnection("jdbc:sqlserver://" + shard6_db_server_name + ";user=" + shard6_db_user_id + ";password=" + shard6_db_user_pwd); 
    } 

    //DB Field Names definition 
    def extractvalues(r: ResultSet) = { 
     Row(r.getString(1),r.getString(2)) 
    } 

    //Prepare SQL Statement with parameter value replacement 
    val query = """SELECT docId = audt_id, text = auction_title FROM brands6.dbo.uf_ds_marketplace_classification_listing(@cont_id, @top, '@start_date', '@end_date') WHERE ? < ? OPTION(RECOMPILE);""" 
     .replaceAll("@cont_id", cont_id) 
     .replaceAll("@top", top) 
     .replaceAll("@start_date", start_date) 
     .replaceAll("@end_date", end_date) 
     .stripMargin 

    //Connect to Source DB and execute the Prepared SQL Steatement 
    val mpDataRDD = new JdbcRDD(mp_spark.sparkContext 
     ,createConnection 
     ,query 
     ,lowerBound = 0 
     ,upperBound = 10000000 
     ,numPartitions = 1 
     ,mapRow = extractvalues) 

    val schema_string = "docId,text" 
    val fields = StructType(schema_string.split(",") 
     .map(fieldname => StructField(fieldname, StringType, true))) 

    //Create Data Frame using format identified through schema_string 
    val mpDF = mp_spark.createDataFrame(mpDataRDD, fields) 
    mpDF.collect() 

    val mp_listing_tmp = mpDF.selectExpr("cast(docId as long) docId", "text") 
    mp_listing_tmp.printSchema() 
    println(mp_listing_tmp.first) 

    val mp_listing_lda_df = mp_listing_tmp.withColumn("docId", mp_listing_tmp("docId")) 
    mp_listing_lda_df.printSchema() 

    val tokenizer = new RegexTokenizer() 
     .setInputCol("text") 
     .setOutputCol("rawTokens") 
     .setMinTokenLength(2) 

    val stopWordsRemover = new StopWordsRemover() 
     .setInputCol("rawTokens") 
     .setOutputCol("tokens") 

    val vocabSize = 4000 

    val countVectorizer = new CountVectorizer() 
     .setVocabSize(vocabSize) 
     .setInputCol("tokens") 
     .setOutputCol("features") 

    val PreLDApipeline = new Pipeline() 
     .setStages(Array(tokenizer, stopWordsRemover, countVectorizer)) 

    val PreLDAmodel = PreLDApipeline.fit(mp_listing_lda_df) 
    //comment out after saving it the first time 
    PreLDAmodel.write.overwrite().save(pre_lda_model_file_path) 

    val documents = PreLDAmodel.transform(mp_listing_lda_df) 
     .select("docId","features") 
     .rdd 
     .map{ case Row(row_num: Long, features: MLVector) => (row_num, features) } 
     .toDF() 

    //documents.printSchema() 
    val numTopics: Int = 20 
    val maxIterations: Int = 100 

    //note the FeaturesCol need to be set 
    val lda = new LDA() 
     .setOptimizer("em") 
     .setK(numTopics) 
     .setMaxIter(maxIterations) 
     .setFeaturesCol(("_2")) 

    val vocabArray = PreLDAmodel.stages(2).asInstanceOf[CountVectorizerModel].vocabulary 
    } 
} 

アム:ここ

Error:(132, 8) value toDF is not a member of org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 
possible cause: maybe a semicolon is missing before `value toDF'? 
     .toDF() 

は完全なコードですコードのインポートセクションの競合に関連しています。助けをお待ちしています。行われるために必要な

答えて

2

2物事:

インポート暗黙:これはorg.apache.spark.sql.SQLContextのインスタンスが作成された後にのみ行われるべきであることに注意してください。メソッドの外

val sqlContext= new org.apache.spark.sql.SQLContext(sc) 

import sqlContext.implicits._ 

移動ケースクラス:それはのように記述しなければならないケースクラス、あなたがデータフレームのスキーマを定義するのに使用することにより、それを必要とする方法の外で定義されなければなりません。ここでそれについてもっと読むことができます:https://issues.scala-lang.org/browse/SI-6649

+0

また、すでにSparkSessionをお持ちの場合は、それを参照することができます。 import sc.sqlContext.implicits._ – Michael

関連する問題