[OK]をクリックすると、操作が簡単になりません。エラーメッセージが言うように、あなたが作業している同じデータを検討し、そこ
// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)
// Create an RDD with some data
val x = sc.parallelize(Array(
Testing(null, 21, 905),
Testing("Noelia", 26, 1130),
Testing("Pilar", 52, 1890),
Testing("Roberto", 31, 1450)
))
// Convert RDD to a DataFrame
val df = sqlContext.createDataFrame(x)
// For SQL usage we need to register the table
df.registerTempTable("df")
sqlContext.sql("select collect_list(age) as age from df").show
// +----------------+
// | age|
// +----------------+
// |[21, 26, 52, 31]|
// +----------------+
sqlContext.sql("select collect_list(collect_list(age), collect_list(salary)) as arrayInt from df").show
から段階的に行ってみよう:
org.apache.spark.sql.AnalysisException: No handler for Hive udf class
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Exactly one argument is expected..; line 1 pos 52 [...]
collest_list
はただ一つの引数を取ります。ドキュメントhereを調べてみましょう。
実際には1つの議論が必要です!しかし関数オブジェクトのドキュメントをさらに進めてみましょう。配列関数を使うと、ColumnまたはColumnの繰り返しパラメータから新しい配列列を作成することができます。私たちはここから行くん
// +-------------------------------------------------------------------+
// |arrayInt |
// +-------------------------------------------------------------------+
// |[WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450)]|
// +-------------------------------------------------------------------+
:
sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df").show(false)
配列関数は、前の手の年齢と給与の両方にcollect_listによって作成列リストから実際に列を作成します。それでは、ということ使ってみましょうか?
DataFrameのRowは、Rowでラップされた別のコレクションであることを覚えておく必要があります。
私が最初にやることは、そのコレクションの作業です。では、どうすればWrappedArray[WrappedArray[Int]]
を平坦化できますか?
Scalaはあなただけの今、私たちはデータフレームの上にそれを使用することができますので、のは、UDFでラップさせ.flatten
import scala.collection.mutable.WrappedArray
val firstRow: mutable.WrappedArray[mutable.WrappedArray[Int]] =
sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df")
.first.get(0).asInstanceOf[WrappedArray[WrappedArray[Int]]]
// res26: scala.collection.mutable.WrappedArray[scala.collection.mutable.WrappedArray[Int]] =
// WrappedArray(WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450))
firstRow.flatten
// res27: scala.collection.mutable.IndexedSeq[Int] = ArrayBuffer(21, 26, 52, 31, 905, 1130, 1890, 1450)
を使用する必要がある種類の魔法れる:
def flatten(array: WrappedArray[WrappedArray[Int]]) = array.flatten
sqlContext.udf.register("flatten", flatten(_: WrappedArray[WrappedArray[Int]]))
我々はUDFを登録しているので私たちはsqlContextの中でこれを使うことができます:
sqlContext.sql("select flatten(array(collect_list(age), collect_list(salary))) as arrayInt from df").show(false)
// +---------------------------------------+
// |arrayInt |
// +---------------------------------------+
// |[21, 26, 52, 31, 905, 1130, 1890, 1450]|
// +---------------------------------------+
私はこれが助けてくれることを願っています!