2015-11-25 7 views
8

私は複数の列を持つorg.apache.spark.sql.DataFrameを持っています。 Iは、-1と1の間でデータをスケーリングし、IはStandardScalerを発見 スカラーのMinMax正規化

scala> val df = sqlContext.csvFile("tenop.csv") 
df: org.apache.spark.sql.DataFrame = [gst_id_matched: string, 
    ip_crowding: string, lat_long_dist: double, stream_name_1: string] 

org.apache.spark.sql.DataFrame

ようなデータ型を保持するためのMinMax正規化または任意の技術を用いて、1つのカラム(lat_long_dist)スケールします私は変換を行うことができます前に、データセットを変換する必要があります。そこには、単純なクリーンな方法です。

答えて

9

私はあなたが何をしたいと思い、この

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.{min, max, lit} 

val df = sc.parallelize(Seq(
    (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3) 
)).toDF("k", "v") 

val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match { 
    case Row(x: Double, y: Double) => (x, y) 
} 

val scaledRange = lit(2) // Range of the scaled variable 
val scaledMin = lit(-1) // Min value of the scaled variable 
val vNormalized = ($"v" - vMin)/(vMax - vMin) // v normalized to (0, 1) range 

val vScaled = scaledRange * vNormalized + scaledMin 

df.withColumn("vScaled", vScaled).show 

// +---+-----+--------------------+ 
// | k| v|    vScaled| 
// +---+-----+--------------------+ 
// | 1| 0.5| -0.3093093093093092| 
// | 2| 10.2| 0.27327327327327344| 
// | 3| 5.7|0.003003003003003...| 
// | 4|-11.0|    -1.0| 
// | 5| 22.3|     1.0| 
// +---+-----+--------------------+ 
11

のようなものは、ここでは、すでにスパークで遊んでいる別の提案ですされています。

mlパッケージでMinMaxScalerを使用しないのはなぜですか?

zero323と同じ例を試してみましょう。

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.ml.feature.MinMaxScaler 
import org.apache.spark.sql.functions.udf 

val df = sc.parallelize(Seq(
    (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3) 
)).toDF("k", "v") 

//val df.map(r => Vectors.dense(Array(r.getAs[Double]("v")))) 

val vectorizeCol = udf((v:Double) => Vectors.dense(Array(v))) 
val df2 = df.withColumn("vVec", vectorizeCol(df("v")) 

val scaler = new MinMaxScaler() 
    .setInputCol("vVec") 
    .setOutputCol("vScaled") 
    .setMax(1) 
    .setMin(-1) 

scaler.fit(df2).transform(df2).show 
+---+-----+-------+--------------------+ 
| k| v|  vv|     vs| 
+---+-----+-------+--------------------+ 
| 1| 0.5| [0.5]|[-0.3093093093093...| 
| 2| 10.2| [10.2]|[0.27327327327327...| 
| 3| 5.7| [5.7]|[0.00300300300300...| 
| 4|-11.0|[-11.0]|    [-1.0]| 
| 5| 22.3| [22.3]|    [1.0]| 
+---+-----+-------+--------------------+ 

複数の列を同時にスケーリングすることを利用してください。

val df = sc.parallelize(Seq(
    (1.0, -1.0, 2.0), 
    (2.0, 0.0, 0.0), 
    (0.0, 1.0, -1.0) 
)).toDF("a", "b", "c") 

import org.apache.spark.ml.feature.VectorAssembler 

val assembler = new VectorAssembler() 
    .setInputCols(Array("a", "b", "c")) 
    .setOutputCol("features") 

val df2 = assembler.transform(df) 

// Reusing the scaler instance above with the same min(-1) and max(1) 
scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show 
+---+----+----+--------------+--------------------+ 
| a| b| c|  features|  scaledFeatures| 
+---+----+----+--------------+--------------------+ 
|1.0|-1.0| 2.0|[1.0,-1.0,2.0]|  [0.0,-1.0,1.0]| 
|2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...| 
|0.0| 1.0|-1.0|[0.0,1.0,-1.0]|  [-1.0,1.0,-1.0]| 
+---+----+----+--------------+--------------------+ 
+0

優れた答えは、それは私のために多くの時間を節約します:) –

+0

嬉しいことに@MostafaAlaa – Lyle