2015-10-21 3 views
7

Spark Scalaを使用してCSVファイルからすべての列のヒストグラムを計算しようとしています。Apache Spark Scalaを使用して大規模なCSV/RDD [配列[倍精度]]のすべての列のヒストグラムを取得するには?

ヒストグラムをサポートするDoubleRDDFunctionsが見つかりました。 したがって、すべての列のヒストグラムを取得するために次のようにコード化しました。

  1. 列数
  2. は、各列のRDD[double]を作成し、DoubleRDDFunctions

    var columnIndexArray = Array.tabulate(rdd.first().length) (_ * 1) 
    
    val histogramData = columnIndexArray.map(columns => { 
        rdd.map(lines => lines(columns)).histogram(6) 
    }) 
    

を使用して、各RDDのヒストグラムを計算しなさい、それは良い方法ですか? 誰かがこれに対処するいくつかのより良い方法を提案できますか?

ありがとうございます。中

val histCol1 = RDD.map(record => record.col_1).countByValue() 

答えて

5

正確に、より良いが、代替の方法ではありません明らかにされたハッシュテーブル(Scalaの地図を)返します。 RDDをDataFrameに変換し、histogram_numeric UDFを使用することです。

例データ:

import scala.util.Random 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.{callUDF, lit, col} 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.hive.HiveContext 

val sqlContext = new HiveContext(sc) 

Random.setSeed(1) 

val ncol = 5 

val rdd = sc.parallelize((1 to 1000).map(
    _ => Row.fromSeq(Array.fill(ncol)(Random.nextDouble)) 
)) 

val schema = StructType(
    (1 to ncol).map(i => StructField(s"x$i", DoubleType, false))) 

val df = sqlContext.createDataFrame(rdd, schema) 
df.registerTempTable("df") 

問合せ:

val nBuckets = 3 
val columns = df.columns.map(
    c => callUDF("histogram_numeric", col(c), lit(nBuckets)).alias(c)) 
val histograms = df.select(columns: _*) 

histograms.printSchema 

// root 
// |-- x1: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x2: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x3: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x4: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x5: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 

histograms.select($"x1").collect() 

// Array([WrappedArray([0.16874313309969038,334.0], 
// [0.513382068667877,345.0], [0.8421388886903808,321.0])]) 
+1

に適用します。私はspark 1.5.1を使用しています –

+0

UDFはHiveContextを必要とします。 – zero323

+0

ありがとう...あなたの答えに変数名を編集しました。 –

1

(ScalaのAPI)変換、countByValueはあなたのRDDで最初の列ためにヒストグラムデータを生成するために、例えばので、あなたは何をしたい

を行うべきレコードは、フィールドcol_1を持つケースクラスのインスタンスであるRDD内のデータ行を参照します。

のでhistCol1はキーが1列(COL_1)で一意の値であり、値がそれぞれ一意の値の周波数が

+0

提案いただきありがとうございます。しかし、私はバケツサイズを与える必要があります。最高のバケット10. countByValue()は、ダブルRDDのヒストグラムよりも効率的ですか? –

+0

"bucket size"は実際にはcountByValueによって返されます - 各値はバケットのサイズですが、キーはバケット名です – doug

+0

バケットサイズを1つの値に修正できますか?明確なカウントを考慮する代わりに。私はすべての個体数が必要ではありません。最大バケットのヒストグラムが必要です。 –

関連する問題