2016-09-25 9 views
0

私はPythonに精通しており、Spark-Scalaを学んでいます。Spark-Scalaでは、List of ArraysをDataFrameにコピーする方法は?

私はこの構文でdesribed構造を有するDATAFRAME構築したい: はhttp://spark.apache.org/docs/latest/ml-pipeline.html

現在、私のデータが配列である私が引っ張っていた:私はこのURLから上記の構文を持っ

// Prepare training data from a list of (label, features) tuples. 
val training = spark.createDataFrame(Seq(
    (1.1, Vectors.dense(1.1, 0.1)), 
    (0.2, Vectors.dense(1.0, -1.0)), 
    (3.0, Vectors.dense(1.3, 1.0)), 
    (1.0, Vectors.dense(1.2, -0.5)) 
)).toDF("label", "features") 

をDFのうち:

val my_a = gspc17_df.collect().map{row => Seq(row(2),Vectors.dense(row(3).asInstanceOf[Double],row(4).asInstanceOf[Double]))} 

私の配列の構造は、上記のDFと非常に似ています。

my_a: Array[Seq[Any]] = 
Array(
    List(-1.4830674013266898, [-0.004192832940431825,-0.003170667657263393]), 
    List(-0.05876766500768526, [-0.008462913654529357,-0.006880595828929472]), 
    List(1.0109273250546658, [-3.1816797620416693E-4,-0.006502619326182358])) 

上記の構造のDataFrameに配列からデータをコピーするにはどうすればよいですか?

<console>:105: error: inferred type arguments [Seq[Any]] do not conform to method createDataFrame's type parameter bounds [A <: Product] 
     val my_df = spark.createDataFrame(my_a).toDF("label","features") 
         ^
<console>:105: error: type mismatch; 
found : scala.collection.mutable.WrappedArray[Seq[Any]] 
required: Seq[A] 
     val my_df = spark.createDataFrame(my_a).toDF("label","features") 
             ^
scala> 

答えて

4

ここでの最初の問題は、あなたが行データを保存するためにListを使用することである:私に吠え

val my_df = spark.createDataFrame(my_a).toDF("label","features") 

スパーク:

私はこの構文を試してみました。リストは均質なデータ構造であり、 Anyrow(2))と DenseVectorの唯一の共通タイプは AnyObject)なので、最終的には Seq[Any]となります。

次の問題は、row(2)をまったく使用することです。 は実質的にAnyのコレクションなので、この操作では有用な型は返されず、Encoderを明示せずにDataFrameに結果を格納できませんでした。

スパークッシュの観点からは、どちらも良いアプローチではありません。 collect - データを変換するだけでは、コメントやコメントは必要ありません。 をマッピングしてVectorsを作成するだけでは意味がありません。

import org.apache.spark.ml.feature.VectorAssembler 

val assembler = new VectorAssembler() 
    .setInputCols(Array(df.columns(3), df.columns(4))) 
    .setOutputCol("features") 

assembler.transform(df).select(df.columns(2), "features") 

またはあなたが本当に手動UDFこれを処理する場合:あなたはVectorAssemblerを使用することができます何の型の不一致が存在しないと仮定すると

val toVec = udf((x: Double, y: Double) => Vectors.dense(x, y)) 

df.select(col(df.columns(2)), toVec(col(df.columns(3)), col(df.columns(4)))) 

一般に、Sparkで使用する前にScalaをよく知っておくことを強くおすすめします。

関連する問題