2016-04-04 26 views
2

私はWikipedia XMLダンプでLDAを実行しようとしています。生のテキストのRDDを取得した後、私はデータフレームを作成し、それをTokenizer、StopWordsおよびCountVectorizerのパイプラインを通して変換しています。私はCountVectorizerのベクトルのRDDをMLLibのOnlineLDAに渡すつもりです。 は、ここに私のコードです:私はので、ラインのJavaRDD <Row>をJavaRDDに変換する<Vector>

Object[] arr = row.getList(0).toArray(); 


Caused by: java.lang.ClassCastException: org.apache.spark.mllib.linalg.SparseVector cannot be cast to scala.collection.Seq 
at org.apache.spark.sql.Row$class.getSeq(Row.scala:278) 
at org.apache.spark.sql.catalyst.expressions.GenericRow.getSeq(rows.scala:192) 
at org.apache.spark.sql.Row$class.getList(Row.scala:286) 
at org.apache.spark.sql.catalyst.expressions.GenericRow.getList(rows.scala:192) 
at xmlProcess.ParseXML$2.call(ParseXML.java:142) 
at xmlProcess.ParseXML$2.call(ParseXML.java:1) 

クラスキャスト例外を取得しています

// Configure an ML pipeline 
RegexTokenizer tokenizer = new RegexTokenizer() 
    .setInputCol("text") 
    .setOutputCol("words"); 

StopWordsRemover remover = new StopWordsRemover() 
      .setInputCol("words") 
      .setOutputCol("filtered"); 

CountVectorizer cv = new CountVectorizer() 
      .setVocabSize(vocabSize) 
      .setInputCol("filtered") 
      .setOutputCol("features"); 

Pipeline pipeline = new Pipeline() 
      .setStages(new PipelineStage[] {tokenizer, remover, cv}); 

// Fit the pipeline to train documents. 
PipelineModel model = pipeline.fit(fileDF); 

JavaRDD<Vector> countVectors = model.transform(fileDF) 
      .select("features").toJavaRDD() 
      .map(new Function<Row, Vector>() { 
      public Vector call(Row row) throws Exception { 
       Object[] arr = row.getList(0).toArray(); 

       double[] features = new double[arr.length]; 
       int i = 0; 
       for(Object obj : arr){ 
        features[i++] = (double)obj; 
       } 
       return Vectors.dense(features); 
      } 
      }); 

私はこのhereを行うにはScalaの構文が見つかりましたが、それを行うための任意の例を見つけることができませんでしたJavaで。私はrow.getAs[Vector](0)を試しましたが、これはScalaの構文です。どのようにJavaでそれを行うには?

答えて

3

私はVectorへの単純なキャストでこれを行うことができました。なぜ私は単純なものを最初に試していないのかわかりません!

  JavaRDD<Vector> countVectors = model.transform(fileDF) 
       .select("features").toJavaRDD() 
       .map(new Function<Row, Vector>() { 
       public Vector call(Row row) throws Exception { 
        return (Vector)row.get(0); 
       } 
       }); 
0

あなたはそれがLDAで動作するためJavaRDDDataFrame/DataSetを転化する必要はありません。手抜きの数時間後、私は最終的にrddScalaに入れました。

関連の輸入:

import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover} 
import org.apache.spark.ml.linalg.{Vector => MLVector} 
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer} 
import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.sql.{Row, SparkSession} 

コードのスニペットは、残りの部分を次のthis exampleと同じまま:

val cvModel = new CountVectorizer() 
     .setInputCol("filtered") 
     .setOutputCol("features") 
     .setVocabSize(vocabSize) 
     .fit(filteredTokens) 


val countVectors = cvModel 
     .transform(filteredTokens) 
     .select("docId","features") 
     .rdd.map { case Row(docId: String, features: MLVector) => 
        (docId.toLong, Vectors.fromML(features)) 
       } 
val mbf = { 
    // add (1.0/actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets. 
    val corpusSize = countVectors.count() 
    2.0/maxIterations + 1.0/corpusSize 
    } 
    val lda = new LDA() 
    .setOptimizer(new OnlineLDAOptimizer().setMiniBatchFraction(math.min(1.0, mbf))) 
    .setK(numTopics) 
    .setMaxIterations(2) 
    .setDocConcentration(-1) // use default symmetric document-topic prior 
    .setTopicConcentration(-1) // use default symmetric topic-word prior 

    val startTime = System.nanoTime() 
    val ldaModel = lda.run(countVectors) 
    val elapsed = (System.nanoTime() - startTime)/1e9 

    /** 
    * Print results. 
    */ 
    // Print training time 
    println(s"Finished training LDA model. Summary:") 
    println(s"Training time (sec)\t$elapsed") 
    println(s"==========") 

おかげでコードhereの作者に行きます。

関連する問題