1

私は毎週データセットを集約する方法を必要としています。ここに私のデータセットがありますApache Spark |特定の時間枠集約

|  date|organization_id|media_package_id|event_uuid | 
+----------+---------------+----------------+-----------+ 
|2016-10-25|    1|    11|  76304d| 
|2016-10-25|    1|    11|  e6285b| 
|2016-10-22|    2|    21|  16c04d| 
|2016-10-22|    2|    21|  17804d| 
|2016-10-22|    2|    21|  18904x| 
|2016-10-21|    2|    21|  51564q| 
|2016-10-07|    4|    98|  12874t| 
|2016-10-05|    4|    98|  11234d| 
+----------+---------------+----------------+-----------+ 

希望の集計結果を得るためにSparkジョブが毎日実行されているとします。そして、集計後のデータセットの上に、たとえば週単位で結果を欲しいと思います。ここで

|  date|organization_id|media_package_id|  count| 
+----------+---------------+----------------+-----------+ 
|2016-10-24|    1|    11|   2| 
|2016-10-17|    2|    21|   4| 
|2016-10-03|    4|    98|   2| 
+----------+---------------+----------------+-----------+ 

は、あなたはそれが私が何とか毎日の集計を行うために管理

(私は最善の方法だと思います)、週の最初の日を取っている日付列を参照してください場合。ここで私はここで

val data = MongoSupport.load(spark, "sampleCollection") 
val dataForDates = data.filter(dataForDates("date").isin(dates : _*)) 

val countByDate = proofEventsForDates.groupBy("DATE", "ORGANIZATION_ID", "MEDIA_PACKAGE_ID") 
    .agg(count("EVENT_UUID").as("COUNT")) 

val finalResult = impressionsByDate 
    .select(
    col("DATE").as("date"), 
    col("ORGANIZATION_ID").as("organization_id"), 
    col("MEDIA_PACKAGE_ID").as("media_package_id"), 
    col("COUNT").as("count") 
) 

をやった方法です、データセットをフィルタリングするために初めに、私は少なくとも約一ヶ月の日付で構成され、特別なdatesリストを渡しています。そして、私は取得していた結果は、以降、私は、このデータセットの集約を毎週取得に見当もつかない(私が欲しいものではありません)ここで

|  date|organization_id|media_package_id|  count| 
+----------+---------------+----------------+-----------+ 
|2016-10-25|    1|    11|   2| 
|2016-10-22|    2|    21|   3| 
|2016-10-21|    2|    21|   1| 
|2016-10-07|    2|    21|   1| 
|2016-10-05|    2|    21|   1| 
+----------+---------------+----------------+-----------+ 

です。

+0

'organization_id = 5'の行はどうなりましたか? – mtoto

+0

@mtoto質問が編集されました。私はちょっとタイプミスだった – Switch

+0

同じ行の期待される出力に基づいて 'media_package_id'は' 21'、いいえ? – mtoto

答えて

3

すでにクラスdateのごdate列をされたと仮定すると、あなたは、集約のために不足しているグループ化列を抽出するための機能year()weekofyear()を使用することができます。

import org.apache.spark.sql.functions.weekofyear 
import org.apache.spark.sql.functions.year 

(df 
    .withColumn("week_nr", weekofyear($"date")) 
    .withColumn("year", year($"date")) 
    .groupBy("year", 
      "week_nr", 
      "organization_id", 
      "media_package_id") 
    .count().orderBy(desc("week_nr"))).show 
+----+-------+---------------+----------------+-----+ 
|year|week_nr|organization_id|media_package_id|count| 
+----+-------+---------------+----------------+-----+ 
|2016|  43|    1|    11| 2| 
|2016|  42|    2|    21| 4| 
|2016|  40|    4|    98| 2| 
+----+-------+---------------+----------------+-----+ 
+0

あなたはgroupBy 'year($" date ")'と 'weekofyear'でグループ化する必要があります。そうでなければ、1年以上のデータがある場合は、一緒に。 –

+0

ありがとうございました! – mtoto

関連する問題