2016-12-14 14 views
3

テーブルを行の作成順序で除外する一般的な使用例があります。Spark Sql Dedup Rows

たとえば、ユーザーアクションのイベントログがあります。ユーザーは自分の好きなカテゴリを時折表示します。 分析段階では、ユーザーの最後にお気に入りのカテゴリのみを知りたいと考えています。

例データ:

id action_type value date 
123 fav_category 1 2016-02-01 
123 fav_category 4 2016-02-02 
123 fav_category 8 2016-02-03 
123 fav_category 2 2016-02-04 

私たちは、日付列に応じてのみ、最新のアップデートを入手したいと思います。

select * from (
    select *, row_number() over (
     partition by id,action_type order by date desc) as rnum from tbl 
) 
where rnum=1; 

しかし、その後、それは部分的マッパー側に集約されていないと我々は減速にシャッフルすべてのデータを取得します:私たちは、のコースSQLでそれを行うことができます。

私はこの問題SPARK-17662でのJiraを掲載しているし、それがより良いSQLスタイルの提案で閉じられました:

select id, 
     action_type, 
     max(struct(date, *)) last_record 
from tbl 
group by id,action_type 

このソリューションは非常にきれいですが、まだ二つの問題があるさ:

  1. これをフィールドの1つがソート可能でない場合にはトリックは機能しません(マップ<>のように)
  2. フローの後半でフィールドの一部のみを選択すると、プッシュダウン述部が最適化されません私たちの流れは、最初から不必要なフィールドを無視します。

最終的に問題#1を解決しても問題#2に悩まされているこのためのUDAFを作成しました。

もっと良い解決策をお持ちの方はいらっしゃいますか?

答えて

3

私たちの現在の解決策を望む人のために。ここでUDAFのためのコードがある - 私たちは私たちは、パッケージorg.apache.spark.sql.typesにあるいくつかの内部機能を使用していたことに気づく:

package org.apache.spark.sql.types 

case class MaxValueByKey(child1: Expression, child2: Expression) extends DeclarativeAggregate { 

    override def children: Seq[Expression] = child1 :: child2 :: Nil 

    override def nullable: Boolean = true 

    // Return data type. 
    override def dataType: DataType = child2.dataType 

    // Expected input data type. 
    override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, AnyDataType) 

    override def checkInputDataTypes(): TypeCheckResult = 
    TypeUtils.checkForOrderingExpr(child1.dataType, "function max") 

    private lazy val max = AttributeReference("max", child1.dataType)() 
    private lazy val data = AttributeReference("data", child2.dataType)() 

    override lazy val aggBufferAttributes: Seq[AttributeReference] = max :: data :: Nil 

    override lazy val initialValues: Seq[Expression] = Seq(
    Literal.create(null, child1.dataType), 
    Literal.create(null, child2.dataType) 
) 

    override lazy val updateExpressions: Seq[Expression] = 
    chooseKeyValue(max, data, child1, child2) 

    override lazy val mergeExpressions: Seq[Expression] = 
    chooseKeyValue(max.left, data.left, max.right, data.right) 

    def chooseKeyValue(key1:Expression, value1: Expression, key2:Expression, value2: Expression) = Seq(
    If(IsNull(key1), key2, If(IsNull(key2), key1, If(GreaterThan(key1, key2), key1, key2))), 
    If(IsNull(key1), value2, If(IsNull(key2), value1, If(GreaterThan(key1, key2), value1, value2))) 
) 

    override lazy val evaluateExpression: AttributeReference = data 
} 

object SparkMoreUDAFs { 
    def maxValueByKey(key: Column, value: Column): Column = 
     Column(MaxValueByKey(key.expr, value.expr).toAggregateExpression(false)) 
} 

と使用方法は次のとおりです。

sqlContext.table("tbl").groupBy($"id",$"action_type") 
      .agg(SparkMoreUDAFs.maxValueByKey($"date", expr("struct(date,*)")).as("s")) 

私はそれが非常にエレガントであるかどうかはわかりませんが、地図側の部分集約を行い、すべての列タイプに対して機能します。さらに、私はこのUDAFもそれ自身で役に立つと思う。

は、キーがソート可能である場合に(ところでなかなか良さそうです)誰か...

0

あなたUDAFが動作するのに役立ちます願っています。それはmax(struct(key, value))でも動作します(そうでない場合は教えてください)。マップは現在注文可能ではありません。私は初期の作業(https://github.com/apache/spark/pull/15970)を行っていますが、より包括的なアプローチが必要な場合があります。

述語をプッシュダウンすることができますか?私はそこで何が起こっているのか知りたい。

+0

もちろん、キーは注文可能でなければなりません - 私たちはそれらを最大限に活用しています。私のUDAFのポイントは、すべての値(非注文を含む)を保持することです。 述語のプッシュダウンについて、 'select id、action_type、max(struct(date、*))をt1からsとして実行していると仮定し、後でフローの中で' select id、 action_type、s.col2'である。プラン・オプティマイザは、他の列の生データを読み取ることさえできません。しかし、私たちは 'max(struct(date、*))'を持っているので、すべての列を読み込み(シャッフル)します。 – uzadude

+0

Ah Ic、maxには値を順序付ける必要もあります。 –

+0

2番目の問題は、ネストされたデータの列プルーニングです。私たちはSpark 2.2のためにこの分野でいくつかの仕事をするつもりです。既にhttps://github.com/apache/spark/pull/16043の作業があります。 –