2016-03-22 20 views
2

配列の配列を作成したい。これは私のデータテーブルです:Spark SQL - sql関数から配列の配列を生成する

// A case class for our sample table 
case class Testing(name: String, age: Int, salary: Int) 

// Create an RDD with some data 
val x = sc.parallelize(Array(
    Testing(null, 21, 905), 
    Testing("Noelia", 26, 1130), 
    Testing("Pilar", 52, 1890), 
    Testing("Roberto", 31, 1450) 
)) 

// Convert RDD to a DataFrame 
val df = sqlContext.createDataFrame(x) 

// For SQL usage we need to register the table 
df.registerTempTable("df") 

私は整数列 "年齢"の配列を作成したいと思います。そのために私は「collect_list」を使用します。

sqlContext.sql("SELECT collect_list(age) as age from df").show 

をしかし、今、私は上記で作成したとして、複数の配列を含む配列を生成したい:

sqlContext.sql("SELECT collect_list(collect_list(age), collect_list(salary)) as arrayInt from df").show 

しかし、これは仕事、または関数org.apacheを使用していません.spark.sql.functions.array。何か案は?

答えて

9

[OK]をクリックすると、操作が簡単になりません。エラーメッセージが言うように、あなたが作業している同じデータを検討し、そこ

// A case class for our sample table 
case class Testing(name: String, age: Int, salary: Int) 

// Create an RDD with some data 
val x = sc.parallelize(Array(
    Testing(null, 21, 905), 
    Testing("Noelia", 26, 1130), 
    Testing("Pilar", 52, 1890), 
    Testing("Roberto", 31, 1450) 
)) 

// Convert RDD to a DataFrame 
val df = sqlContext.createDataFrame(x) 

// For SQL usage we need to register the table 
df.registerTempTable("df") 
sqlContext.sql("select collect_list(age) as age from df").show 

// +----------------+ 
// |    age| 
// +----------------+ 
// |[21, 26, 52, 31]| 
// +----------------+ 

sqlContext.sql("select collect_list(collect_list(age),  collect_list(salary)) as arrayInt from df").show 

から段階的に行ってみよう:

org.apache.spark.sql.AnalysisException: No handler for Hive udf class 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Exactly one argument is expected..; line 1 pos 52 [...] 

collest_listはただ一つの引数を取ります。ドキュメントhereを調べてみましょう。

実際には1つの議論が必要です!しかし関数オブジェクトのドキュメントをさらに進めてみましょう。配列関数を使うと、ColumnまたはColumnの繰り返しパラメータから新しい配列列を作成することができます。私たちはここから行くん

// +-------------------------------------------------------------------+ 
// |arrayInt               | 
// +-------------------------------------------------------------------+ 
// |[WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450)]| 
// +-------------------------------------------------------------------+ 

sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df").show(false) 

配列関数は、前の手の年齢と給与の両方にcollect_listによって作成列リストから実際に列を作成します。それでは、ということ使ってみましょうか?

DataFrameのRowは、Rowでラップされた別のコレクションであることを覚えておく必要があります。

私が最初にやることは、そのコレクションの作業です。では、どうすればWrappedArray[WrappedArray[Int]]を平坦化できますか?

Scalaはあなただけの今、私たちはデータフレームの上にそれを使用することができますので、のは、UDFでラップさせ.flatten

import scala.collection.mutable.WrappedArray 

val firstRow: mutable.WrappedArray[mutable.WrappedArray[Int]] = 
    sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df") 
    .first.get(0).asInstanceOf[WrappedArray[WrappedArray[Int]]] 
// res26: scala.collection.mutable.WrappedArray[scala.collection.mutable.WrappedArray[Int]] = 
// WrappedArray(WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450)) 

firstRow.flatten 
// res27: scala.collection.mutable.IndexedSeq[Int] = ArrayBuffer(21, 26, 52, 31, 905, 1130, 1890, 1450) 

を使用する必要がある種類の魔法れる:

def flatten(array: WrappedArray[WrappedArray[Int]]) = array.flatten 
sqlContext.udf.register("flatten", flatten(_: WrappedArray[WrappedArray[Int]])) 

我々はUDFを登録しているので私たちはsqlContextの中でこれを使うことができます:

sqlContext.sql("select flatten(array(collect_list(age), collect_list(salary))) as arrayInt from df").show(false) 

// +---------------------------------------+ 
// |arrayInt        | 
// +---------------------------------------+ 
// |[21, 26, 52, 31, 905, 1130, 1890, 1450]| 
// +---------------------------------------+ 

私はこれが助けてくれることを願っています!