2015-09-04 23 views
9

説明データフレームdf

SparkとSparkSQL:ウィンドウ関数を模倣する方法は?

id |  date 
--------------- 
1 | 2015-09-01 
2 | 2015-09-01 
1 | 2015-09-03 
1 | 2015-09-04 
2 | 2015-09-04 

私はランニングカウンタまたはインデックスを作成したいを考える

同じIDでグループ化された
  • し、そのグループに日付順に並べ替えられ

したがって、

id |  date | counter 
-------------------------- 
1 | 2015-09-01 |  1 
1 | 2015-09-03 |  2 
1 | 2015-09-04 |  3 
2 | 2015-09-01 |  1 
2 | 2015-09-04 |  2 

これは私がウィンドウ機能で達成できるものです。

val w = Window.partitionBy("id").orderBy("date") 
val resultDF = df.select(df("id"), rowNumber().over(w)) 

残念ながら、スパーク1.4.1は、通常のデータフレームのためにウィンドウ関数をサポートしていません。

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext; 

質問私はせずに、現在のスパーク1.4.1で上記の計算を達成するにはどうすればよい

  • ウィンドウ関数を使う?
  • 通常のデータフレームのウィンドウ機能はいつSparkでサポートされますか?

ありがとう!

+0

データフレームとSQLを使用する必要がありますか、RDDを使用できますか?これは、groupByメソッドでは非常に簡単です。 –

+0

@ KirkBroadhurst:RDDも大丈夫でしょう。ちょっとしたコードの抜粋であなたのアイデアをスケッチしてください。 SparkSQLの時点で、私は現在どのようにこれを行う方法を参照してください:あなたはアイデアがありますか? –

答えて

6

これはRDDで行うことができます。個人的には、RDD用のAPIがより意味をなさないことがわかります。私のデータがデータフレームのように「フラット」になることを常に望むとは限りません。

val df = sqlContext.sql("select 1, '2015-09-01'" 
    ).unionAll(sqlContext.sql("select 2, '2015-09-01'") 
    ).unionAll(sqlContext.sql("select 1, '2015-09-03'") 
    ).unionAll(sqlContext.sql("select 1, '2015-09-04'") 
    ).unionAll(sqlContext.sql("select 2, '2015-09-04'")) 

// dataframe as an RDD (of Row objects) 
df.rdd 
    // grouping by the first column of the row 
    .groupBy(r => r(0)) 
    // map each group - an Iterable[Row] - to a list and sort by the second column 
    .map(g => g._2.toList.sortBy(row => row(1).toString))  
    .collect() 

上記は、次のような結果が得られます。

Array[List[org.apache.spark.sql.Row]] = 
Array(
    List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]), 
    List([2,2015-09-01], [2,2015-09-04])) 

あなたにも「グループ」内の位置をしたい場合は、zipWithIndexを使用することができます。

df.rdd.groupBy(r => r(0)).map(g => 
    g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect() 

Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
    List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)), 
    List(([2,2015-09-01],0), ([2,2015-09-04],1))) 

あなた FlatMapを使用してオブジェクトの簡単なリスト/アレイにこの背中をフラット化できますが、あなたは素晴らしいアイデアではありません「グループ」には何も実行する必要がある場合。

このようにRDDを使用することの欠点は、DataFrameからRDDに変換して元に戻すのが面倒だということです。

+0

多くの感謝!!!それが私が探していた解決策でした。うーん、私はちょうど 'groupBy'が完了したら、通常のScala' list'オペレーションを実行するのに十分なほど "勇敢"ではありませんでした.... –

+0

"g._2.toList.sortBy"リストに何百万ものもの要素の、私はそれらを収集することができません – halil

7

ローカルDataFramesにもHiveContextを使用することができます。そうしないと非常に良い理由がない限り、それはおそらく良い考えです。 SQLContextspark-shellpysparkシェルで利用できます(今のところsparkRは平文SQLContextと思われます)。そのパーサーはSpark SQL and DataFrame Guideで推奨されています。

import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.rowNumber 

object HiveContextTest { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("Hive Context") 
    val sc = new SparkContext(conf) 
    val sqlContext = new HiveContext(sc) 
    import sqlContext.implicits._ 

    val df = sc.parallelize(
     ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil 
    ).toDF("k", "v") 

    val w = Window.partitionBy($"k").orderBy($"v") 
    df.select($"k", $"v", rowNumber.over(w).alias("rn")).show 
    } 
} 
3

Sparkバージョン(> =)1.5をお持ちの場合は、DataFramesのウィンドウ機能が使用できます。しかし、あなたが本当に古いバージョン(例えば、1.4。あなたはdfWithCounter.show

をすれば1)、ここではこの

val df = sc.parallelize((1, "2015-09-01") :: (2, "2015-09-01") :: (1, "2015-09-03") :: (1, "2015-09-04") :: (1, "2015-09-04") :: Nil) 
      .toDF("id", "date") 

val dfDuplicate = df.selecExpr("id as idDup", "date as dateDup") 
val dfWithCounter = df.join(dfDuplicate,$"id"===$"idDup") 
         .where($"date"<=$"dateDup") 
         .groupBy($"id", $"date") 
         .agg($"id", $"date", count($"idDup").as("counter")) 
         .select($"id",$"date",$"counter") 

を解決するためのハックの方法がある今、あなたは取得します:

+---+----------+-------+               
| id|  date|counter| 
+---+----------+-------+ 
| 1|2015-09-01|  1| 
| 1|2015-09-04|  3| 
| 1|2015-09-03|  2| 
| 2|2015-09-01|  1| 
| 2|2015-09-04|  2| 
+---+----------+-------+ 

注意をdateがソートされていないが、counterが正しいこと。 whereステートメントの<=>=に変更して、counterの注文を変更することもできます。

関連する問題