2017-08-24 3 views
3

を超える異なるカウント私はウィンドウの上COUNTDISTINCTをやってみましたし、このエラーを得た:pyspark:ウィンドウ

AnalysisException: u'Distinct window functions are not supported: count(distinct color#1926) 

はpysparkでウィンドウの上に明確な数を行う方法はありますか?ここで

は、いくつかのサンプルコードです。

from pyspark.sql import functions as F 

#function to calculate number of seconds from number of days 
days = lambda i: i * 86400 

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"), 
        (13, "2017-03-15T12:27:18+00:00", "red"), 
        (25, "2017-03-18T11:27:18+00:00", "red")], 
        ["dollars", "timestampGMT", "color"]) 

df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 

#create window by casting timestamp to long (number of seconds) 
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)) 

df = df.withColumn('distinct_color_count_over_the_last_week', F.countDistinct("color").over(w)) 

df.show() 

これは私が見てみたい出力されます:

+-------+--------------------+------+---------------------------------------+ 
|dollars|  timestampGMT| color|distinct_color_count_over_the_last_week| 
+-------+--------------------+------+---------------------------------------+ 
|  17|2017-03-10 15:27:...|orange|          1| 
|  13|2017-03-15 12:27:...| red|          2| 
|  25|2017-03-18 11:27:...| red|          1| 
+-------+--------------------+------+---------------------------------------+ 

答えて

8

私は私がcollect_setとサイズ機能の組み合わせを使用することができることを考え出しましたウィンドウの上にCOUNTDISTINCTの機能を模倣する:

from pyspark.sql import functions as F 

#function to calculate number of seconds from number of days 
days = lambda i: i * 86400 

#create some test data 
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"), 
        (13, "2017-03-15T12:27:18+00:00", "red"), 
        (25, "2017-03-18T11:27:18+00:00", "red")], 
        ["dollars", "timestampGMT", "color"]) 

#convert string timestamp to timestamp type    
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 

#create window by casting timestamp to long (number of seconds) 
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)) 

#use collect_set and size functions to perform countDistinct over a window 
df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w))) 

df.show() 

これは色oの明確な数になり前週のレコード:

+-------+--------------------+------+---------------------------------------+ 
|dollars|  timestampGMT| color|distinct_color_count_over_the_last_week| 
+-------+--------------------+------+---------------------------------------+ 
|  17|2017-03-10 15:27:...|orange|          1| 
|  13|2017-03-15 12:27:...| red|          2| 
|  25|2017-03-18 11:27:...| red|          1| 
+-------+--------------------+------+---------------------------------------+