2016-10-06 6 views
2

Hiveテーブルにある数値データに対して、PySpark(Spark 1.6.2)を使用して主成分分析(PCA)を実行したいと考えています。私は、Sparkのデータフレームにハイブテーブルをインポートすることができるよ:PySpark PCA:データフレームの行を複数の列から単一の列に変換する方法DenseVector?

>>> from pyspark.sql import HiveContext 
>>> hiveContext = HiveContext(sc) 
>>> dataframe = hiveContext.sql("SELECT * FROM my_table") 
>>> type(dataframe) 
<class 'pyspark.sql.dataframe.DataFrame'> 
>>> dataframe.columns 
['par001', 'par002', 'par003', etc...] 
>>> dataframe.collect() 
[Row(par001=1.1, par002=5.5, par003=8.2, etc...), Row(par001=0.0, par002=5.7, par003=4.2, etc...), etc...] 

PySparkにPCAを実行する方法を示し優れたStackOverflowのポストがあります:ポストの「試験」セクションでhttps://stackoverflow.com/a/33481471/2626491

>>> from pyspark.ml.feature import * 
>>> from pyspark.mllib.linalg import Vectors 
>>> data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),), 
...   (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), 
...   (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)] 
>>> df = sqlContext.createDataFrame(data,["features"]) 
>>> type(df) 
<class 'pyspark.sql.dataframe.DataFrame'> 
>>> df.columns 
['features'] 
>>> df.collect() 
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0])), Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0])), Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]))] 

@のdesertnautの例示データフレームの各行は、次にpca関数によって使用されるDenseVectorオブジェクトが含ま:@desertnautは(「特徴」と呼ばれる)だけつの列を有するデータフレームを作成します。

Q)データフレームをHiveから単一列のデータフレーム(「機能」)に変換するにはどうすればよいですか?各行には元の行のすべての値を表すDenseVectorが含まれていますか?

答えて

4

VectorAssemblerを使用してください。データはこれに類似している場合:

from pyspark.sql import Row 

data = sc.parallelize([ 
    Row(par001=1.1, par002=5.5, par003=8.2), 
    Row(par001=0.0, par002=5.7, par003=4.2) 
]).toDF() 

あなたは必要なクラスをインポートする必要があります。

from pyspark.ml.feature import VectorAssembler 

は、インスタンスを作成します。

assembler = VectorAssembler(inputCols=data.columns, outputCol="features") 

変換と選択します。

assembler.transform(data).select("features") 

あなたはユーザー定義関数を使用することもできます。

from pyspark.mllib.linalg import Vectors, VectorUDT 

udfsql.functionsから:

from pyspark.sql.functions import udf, array 

を選択:スパークmllibから1.6輸入VectorsVectorUDT

data.select(
    udf(Vectors.dense, VectorUDT())(*data.columns) 
).toDF("features") 

これはそれほど冗長が、はるかに遅いです。

関連する問題