2016-08-29 2 views
1

DataFrameの場合、udfdf.withColumn("newCol", myUDF("someCol"))とすると、何らかの操作で簡単に新しい列を生成することができます。 Datasetにこのような何かを行うには、私はmap機能を使用することだろうと思います。データセット上のマップ関数は、1つの列の操作に最適化されていますか?

def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] 

あなたは、関数への入力として、ケース全体のクラスTを渡す必要があります。 Dataset[T]に多くのフィールド/列がある場合は、Tの多くの列の1つを操作して余分な列を1つ作成したければ、行全体を渡すのは非常に非効率的です。私の質問は、これを最適化できるほどスマートなのですか?

答えて

0

私はどこでも応答を見つけることができなかったので、私は自分自身を考えました。

scala> val plan = dfX.queryExecution.optimizedPlan 

SerializeFromObject [input[0, int, true] AS value#8] 
    +- MapElements <function1>, obj#7: int 
     +- DeserializeToObject newInstance(class A), obj#6: A 
      +- LocalRelation [x#2, y#3]  

DeserializeToObjectplan.toJSONより冗長によると:今、私たちは、次の最適化計画を取得するかどうかをチェックし

scala> case class A(x: Int, y: Int) 
scala> val dfA = spark.createDataset[A](Seq(A(1, 2))) 
scala> val dfX = dfA.map(_.x) 

のは、複数のフィールドを持つケースクラスを含むデータセットを持ってみましょうステップは、xyの両方が存在することを前提としています。

たとえば、Aのフィールドに直接触れるのではなく、反射を使用する次のスニペットを使用しても問題はありません。

val dfX = dfA.map(
    _.getClass.getMethods.find(_.getName == "x").get.invoke(x).asInstanceOf[Int] 
) 
0

これを最適化することができるように十分にスマート触媒ですか?

TL; DR号はSPARK-14083 Analyze JVM bytecode and turn closures into Catalyst expressionsを参照されたいです。

現在、Spark SQLのCatalyst Optimizerは、あなたがScalaコードで何をするかを知る方法はありません。 SPARK-14083を引用

:データセットのAPIの

一つの大きな利点は、ユーザー定義の閉鎖/ラムダに大きく依存するパフォーマンスを犠牲にし、型の安全性です。これらのクロージャは、式(既知のデータ型、仮想関数呼び出しなど)を最適化する柔軟性があるため、通常は式よりも遅いです。多くの場合、実際にはこれらのクロージャのバイトコードを調べて、何をしようとしているのか把握することは非常に難しくありません。それらを理解できれば、より最適化された実行のためにそれらを直接Catalyst式に変換することができます。

と述べたとしても、あなたのケースがあります:あなたはそれがまだ開いて見ることができると私は誰もが、現在この上で動作疑うとして表現col("name")

df.map(_.name) //同等の。

関連する問題