2

私はadult datasetでSparkとScalaのサンプルを実行しようとしています。パイプラインのSpark DataframeのOneHotEncoder

scala 2.11.8およびspark 1.6.1を使用する。 (今のところ)

問題は、すべてのスパークMLアルゴリズムはその仕事をすることができます前に、数字に符号化されることを必要とするデータセットのカテゴリの特徴量である。..

これまでのところ、私はこれがあります

import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.classification.LogisticRegression 
import org.apache.spark.ml.feature.OneHotEncoder 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 

object Adult { 
    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("Adult example").setMaster("local[*]") 
    val sparkContext = new SparkContext(conf) 
    val sqlContext = new SQLContext(sparkContext) 

    val data = sqlContext.read 
     .format("com.databricks.spark.csv") 
     .option("header", "true") // Use first line of all files as header 
     .option("inferSchema", "true") // Automatically infer data types 
     .load("src/main/resources/adult.data") 

    val categoricals = data.dtypes filter (_._2 == "StringType") 
    val encoders = categoricals map (cat => new OneHotEncoder().setInputCol(cat._1).setOutputCol(cat._1 + "_encoded")) 
    val features = data.dtypes filterNot (_._1 == "label") map (tuple => if(tuple._2 == "StringType") tuple._1 + "_encoded" else tuple._1) 

    val lr = new LogisticRegression() 
     .setMaxIter(10) 
     .setRegParam(0.01) 
    val pipeline = new Pipeline() 
     .setStages(encoders ++ Array(lr)) 

    val model = pipeline.fit(training) 
    } 
} 

ただし、これは機能しません。 pipeline.fitを呼び出すと、元の文字列フィーチャが含まれているため、例外がスローされます。 これらの"StringType"列をパイプラインで削除するにはどうすればよいですか? あるいは、私はそれを完全に間違っていると思います。誰かが別の提案をしていると、私はすべての入力に満足しています:)。

私がこの流れに従うことを選んだのは、私がPythonとPandasで豊富な背景を持っていたからですが、ScalaとSparkの両方を学びたいからです。

答えて

3

高レベルのフレームワークに慣れていれば、ここでは混乱する可能性があることが1つあります。エンコーダーを使用するには、その前にフィーチャーを索引付けする必要があります。それはthe API docsに説明するとおり

ワンホット符号(...)を示す行当たりせいぜい単一の値と、カテゴリインデックスバイナリーベクターのカラムにの列をマッピングカテゴリインデックスを入力します。

import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder} 

val df = Seq((1L, "foo"), (2L, "bar")).toDF("id", "x") 

val categoricals = df.dtypes.filter (_._2 == "StringType") map (_._1) 

val indexers = categoricals.map (
    c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_idx") 
) 

val encoders = categoricals.map (
    c => new OneHotEncoder().setInputCol(s"${c}_idx").setOutputCol(s"${c}_enc") 
) 

val pipeline = new Pipeline().setStages(indexers ++ encoders) 

val transformed = pipeline.fit(df).transform(df) 
transformed.show 

// +---+---+-----+-------------+ 
// | id| x|x_idx|  x_enc| 
// +---+---+-----+-------------+ 
// | 1|foo| 1.0| (1,[],[])| 
// | 2|bar| 0.0|(1,[0],[1.0])| 
// +---+---+-----+-------------+ 

あなたがパイプラインから文字列をドロップする必要はありません見ることができるように。実際にはOneHotEncoderNominalAttributeBinaryAttributeまたは欠落した型属性を持つ数値列を受け入れます。

+0

私はあなたの例を試しましたが、それは実際に実行されています。私は 'StringIndexer'のドキュメントも読んでいますが、なぜこれを' OneHotEncoder'と組み合わせて使う必要があるのか​​分かりません...どちらも同様のことをしているようです。 「なぜ」をもう少し詳しく説明できますか? – Tim

+0

ここに「大きな理由」はありません。それは単なる設計の選択であり、私が知っている限り、特別な意味はありません。実用的な観点からは、スタンドアロンの変換として実行することができます。 – zero323