2016-07-21 2 views
0

私はmapGroupsを使用して集計を実行しようとしていますが、これはSparseMatrixを列の1つとして返し、列を合計します。Spark DatasetでTypedColumnを作成して操作するにはどうすればいいですか?

マップされた行のスキーマがcase classで、列名を提供しました。行列の列はorg.apache.spark.mllib.linalg.Matrixです。集計(select(sum("mycolumn"))を実行する前にtoDFを実行しないと、1つのタイプの不一致エラー(required: org.apache.spark.sql.TypedColumn[MySchema,?])が発生します。 toDFを含めると、別のタイプの不一致エラー:cannot resolve 'sum(mycolumn)' due to data type mismatch: function sum requires numeric types, not org.apache.spark.mllib.linalg.MatrixUDTが表示されます。だからそれを行う正しい方法は何ですか?

答えて

1

ここでは、少なくとも2つの異なる問題で苦労しているようです。 $で暗黙的な変換を使用して

  • o.a.s.sql.functions.colを使用して
    ds.select(col("_1").as[String]) 
    
  • ds.select(col("_1").as[String]) 
    
  • 0を

    val ds = Seq(
        ("foo", Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))), 
        ("foo", Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))) 
    ).toDS 
    

    TypedColumnの選択:あなたはこのようDatasetを持っていると仮定しましょう

追加行列:

  • MLLib MatrixMatrixUDTは加算を実装していません。それはあなたがサードパーティの線形代数ライブラリを使用することができますsum機能することができるか、+
  • を低下させないことを意味しますが、これはあなたが本当にそれをしたい場合は、データセット

スパーク/スパークSQLでサポートされていません。

ds.groupByKey(_._1).mapGroups(
    (key, values) => { 
    val matrices = values.map(_._2.toArray) 
    val first = matrices.next 
    val sum = matrices.foldLeft(first)(
     (acc, m) => acc.zip(m).map { case (x, y) => x + y } 
    ) 
    (key, sum) 
}) 

をと行列に戻ってマップが、個人的に私はちょうどRDDに変換し、breezeを使用します。Datsetsであなたは、このような何かをしようとすることができます。

+0

ありがとうございました。あなたは追加問題の解決法を提案できますか?それが今私が立ち往生している場所です。 – Emre

+0

行列が密であるか疎な行列がありますか?サイズは何ですか? – zero323

+0

かなり小さく疎です。ノードに収まるのに十分小さい。 – Emre

関連する問題