2017-09-13 3 views
2

私はpysparkのデータフレーム内の次のデータがend_stats_df呼ばれています:複数の列にPyspark UDAFを書き込む方法は?

values  start end cat1 cat2 
10   1  2  A  B 
11   1  2  C  B 
12   1  2  D  B 
510   1  2  D  C 
550   1  2  C  B 
500   1  2  A  B 
80   1  3  A  B 

をそして、私は次のようにそれを集約したい:

  • を私は「開始」と「終了」を使用したいです行のグループごとに集計キー
  • としてカラム、私は次のことを実行する必要があります。
    • 両方01に一意の値の数を計算しますそのグループのとcat2start = 1およびend = 2の場合は、A、B、C、Dがあるため、この数値は4になります。この数値はn(この例ではn = 4)として保存されます。
    • valuesフィールドの場合、各グループについて、valuesをソートして、すべてn-1の値を選択する必要があります。ここで、nは上記の最初の操作から保存された値です。
    • 集計の最後には、上記の操作の後にcat1cat2にあるものは本当に気にしません。

上記の例からの出力例は次のとおりです。

values  start end cat1 cat2 
12   1  2  D  B 
550   1  2  C  B 
80   1  3  A  B 

私はpysparkのデータフレームを使用して実行するにはどうすればよいですか?カスタムUDAFを使用する必要があると思います。

答えて

7

PysparkはUDAFを直接サポートしていませんので、手動で集約する必要があります。

from pyspark.sql import functions as f 

def func(values, cat1, cat2): 
    n = len(set(cat1 + cat2)) 
    return sorted(values)[n - 2] 


df = spark.read.load('file:///home/zht/PycharmProjects/test/text_file.txt', format='csv', sep='\t', header=True) 
df = df.groupBy(df['start'], df['end']).agg(f.collect_list(df['values']).alias('values'), 
              f.collect_set(df['cat1']).alias('cat1'), 
              f.collect_set(df['cat2']).alias('cat2')) 
df = df.select(df['start'], df['end'], f.UserDefinedFunction(func, StringType())(df['values'], df['cat1'], df['cat2'])) 
+0

これは素晴らしいことですが、私はそれを試していただき、ありがとうございます。 – Hunle

+0

あなたの例では 'f'とは何ですか? – Hunle

+0

心配しないで、私はそれがpysparkインポートからの "関数"であることがわかります。 – Hunle

関連する問題