2017-09-13 1 views
0

私はpysparkで助けが必要な複雑なwinodwing操作をしています。集計内の集計が必要なグループ化データに対して、pysparkでウィンドウ関数を適用する方法はありますか?

私はsrcdestによってグループ化されたいくつかのデータを持っている、と私はグループごとに次の操作を実行する必要があります。 - は、このグループ内のすべての行について(socket1には表示されませんsocket2の金額を持つ行のみを選択します) は - そのフィルタリング基準を適用した後、合計はamountsフィールド

amounts  src dest socket1 socket2 
10   1  2   A  B 
11   1  2   B  C 
12   1  2   C  D 
510   1  2   C  D 
550   1  2   B  C 
500   1  2   A  B 
80   1   3   A  B 

の金額及びIは、以下の方法でそれを集約したい:
512 + 10 = 522、および80は、SRのための唯一の記録でありますHow to write Pyspark UDAF on multiple columns?

+1

あなたはすでに何を試してみましたか?あなたの実験を共有できますか? – Mariusz

+0

はい、1秒は – guimption

答えて

3

あなたは2つのデータフレームにsocket1と1とsocket2と他のものを、あなたのデータフレームを分割し、その後leftantiが結合を使用することができます:私はここからサンプルデータを借用し、C = 1とdest = 3

amounts  src dest  
522   1  2  
80   1  3  

フィルタリングの代わりに(spark >= 2.0のために働く)。

df = spark.createDataFrame(
    sc.parallelize([ 
     [10,1,2,"A","B"], 
     [11,1,2,"B","C"], 
     [12,1,2,"C","D"], 
     [510,1,2,"C","D"], 
     [550,1,2,"B","C"], 
     [500,1,2,"A","B"], 
     [80,1,3,"A","B"] 
    ]), 
    ["amounts","src","dest","socket1","socket2"] 
) 

そして今、データフレームを分割する:

まず者は、データフレームを作成してみましょう

スパーク> = 2.0

df1 = df.withColumnRenamed("socket1", "socket").drop("socket2") 
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1") 
res = df2.join(df1, ["src", "dest", "socket"], "leftanti") 

スパーク1.6

df1 = df.withColumnRenamed("socket1", "socket").drop("socket2").withColumnRenamed("amounts", "amounts1") 
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1") 
res = df2.join(df1.alias("df1"), ["src", "dest", "socket"], "left").filter("amounts1 IS NULL").drop("amounts1") 

そして最後に集約:

import pyspark.sql.functions as psf 
res.groupBy("src", "dest").agg(
    psf.sum("amounts").alias("amounts") 
).show() 

    +---+----+-------+ 
    |src|dest|amounts| 
    +---+----+-------+ 
    | 1| 3|  80| 
    | 1| 2| 522| 
    +---+----+-------+ 
+1

と1kを+1してください:) @Marie – Prem

+1

ハハありがとう@Prem – MaFF

関連する問題