2016-08-03 4 views
1

私は、pysparkデータフレーム内に、エンジンのイベント「startup」、「shutdown」、および「other」を表す値1、-1および0の列を持っています。 1と-1,2が常に入れ替わる場合ウィンドウ関数による飽和和を設計する

+---+-----+-----+ 
|seq|event|state| 
+---+-----+-----+ 
| 1 | 1 | 1 | 
| 2 | 0 | 1 | 
| 3 | -1 | 0 | 
| 4 | 0 | 0 | 
| 5 | 0 | 0 | 
| 6 | 1 | 1 | 
| 7 | -1 | 0 | 
+---+-----+-----+ 

、これは簡単に行うことができます:それは何かのように、オフだとき、私はエンジンがオンのとき1で、エンジンの状態を持つ列を構築したいと0ウィンドウ関数で、このような

sum('event').over(Window.orderBy('seq')) 

ように、私が私が状態1または0ですでに午前た場合、それぞれ、無視したいいくつかの偽の1秒または-1,2を、持っていること、しかし、発生する可能性があります。私はこのようにような何かを行うことができるようにしたい:私は現時点では想像することはできませんよ1以上または0を下回ることはありません「飽和」SUM関数、または他のいくつかのアプローチが必要になり

+---+-----+-----+ 
|seq|event|state| 
+---+-----+-----+ 
| 1 | 1 | 1 | 
| 2 | 0 | 1 | 
| 3 | 1 | 1 | 
| 4 | -1 | 0 | 
| 5 | 0 | 0 | 
| 6 | -1 | 0 | 
| 7 | 1 | 1 | 
+---+-----+-----+ 

他にもアイデアはありますか?

答えて

0

最後の関数を使用して、最新の状態変更を塗りつぶして、目的の結果を得ることができます。

from pyspark.sql import functions as F 
from pyspark.sql.window import Window 

df = (spark.createDataFrame([ 
     (1, 1), 
     (2, 0), 
     (3, -1), 
     (4, 0) 
    ], ["seq", "event"])) 

w = Window.orderBy('seq') 

#replace zeros with nulls so they can be ignored easily. 
df = df.withColumn('helperCol',F.when(df.event != 0,df.event)) 

#fill statechanges forward in a new column. 
df = df.withColumn('state',F.last(df.helperCol,ignorenulls=True).over(w)) 

#replace -1 values with 0 
df = df.replace(-1,0,['state']) 

df.show() 

これが生成します。

+---+-----+---------+-----+ 
|seq|event|helperCol|state| 
+---+-----+---------+-----+ 
| 1| 1|  1| 1| 
| 2| 0|  null| 1| 
| 3| -1|  -1| 0| 
| 4| 0|  null| 0| 
+---+-----+---------+-----+ 

helperCol

は、データフレームに追加する必要はありません、私は唯一のプロセスをより読みやすくするために、それが含まれています。

関連する問題