3

通常、グループ内のすべての行は集約関数に渡されます。私は条件を使用して行をフィルタリングして、グループ内のいくつかの行だけが集合関数に渡されるようにしたいと思います。このような操作はPostgreSQLで可能です。 Spark SQL DataFrame(Spark 2.0.0)でも同じことをしたいと思います。spark sqlを使用して特定の集計の行をフィルタリングする方法は?

コードは、おそらく次のようになります。

val df = ... // some data frame 
df.groupBy("A").agg(
    max("B").where("B").less(10), // there is no such method as `where` :(
    max("C").where("C").less(5) 
) 

そのため、このようなデータフレームのために:

| A | B | C | 
| 1| 14| 4| 
| 1| 9| 3| 
| 2| 5| 6| 

結果は次のようになります。

|A|max(B)|max(C)| 
|1| 9|  4| 
|2| 5| null| 

持つことが可能ですスパークSQL?

一般に、max以外の集計関数を使用することができ、任意のフィルタリング条件を使用して同じ列に複数の集計が存在する可能性があることに注意してください。

+0

I '最初に、あなたの限界を超えるすべての値をnullまたはNaNで置き換えると、私はgroupByとaggregateを行います。 –

+0

これはこの特定のケースでは機能しますが、異なるフィルタリング条件で同じ列に複数の集約がある場合は機能しません。 –

答えて

0
>>> df = sc.parallelize([[1,14,1],[1,9,3],[2,5,6]]).map(lambda t: Row(a=int(t[0]),b=int(t[1]),c=int(t[2]))).toDF() 
    >>> df.registerTempTable('t') 
    >>> res = sqlContext.sql("select a,max(case when b<10 then b else null end) mb,max(case when c<5 then c else null end) mc from t group by a") 

    +---+---+----+ 
    | a| mb| mc| 
    +---+---+----+ 
    | 1| 9| 3| 
    | 2| 5|null| 
    +---+---+----+ 

あなたは(私はあなたがPostgresの中で同じことを行うと考えている?)SQLを使用することができます

0
df.groupBy("name","age","id").agg(functions.max("age").$less(20),functions.max("id").$less("30")).show(); 

サンプルデータ:

name age id 
abc  23 1001 
cde  24 1002 
efg  22 1003 
ghi  21 1004 
ijk  20 1005 
klm  19 1006 
mno  18 1007 
pqr  18 1008 
rst  26 1009 
tuv  27 1010 
pqr  18 1012 
rst  28 1013 
tuv  29 1011 
abc  24 1015 

出力:

+----+---+----+---------------+--------------+ 
|name|age| id|(max(age) < 20)|(max(id) < 30)| 
+----+---+----+---------------+--------------+ 
| rst| 26|1009|   false|   true| 
| abc| 23|1001|   false|   true| 
| ijk| 20|1005|   false|   true| 
| tuv| 29|1011|   false|   true| 
| efg| 22|1003|   false|   true| 
| mno| 18|1007|   true|   true| 
| tuv| 27|1010|   false|   true| 
| klm| 19|1006|   true|   true| 
| cde| 24|1002|   false|   true| 
| pqr| 18|1008|   true|   true| 
| abc| 24|1015|   false|   true| 
| ghi| 21|1004|   false|   true| 
| rst| 28|1013|   false|   true| 
| pqr| 18|1012|   true|   true| 
+----+---+----+---------------+--------------+ 
+0

これは実際に元の質問に答えるものではありません。これは、以前にフィルタリングするのではなく、集計後に追加の演算子を追加するだけです。 –

4
val df = Seq((1,14,4),(1,9,3),(2,5,6)).toDF("a","b","c") 

val agg = df.groupBy("a").agg(max(when($"b" < 10, $"b")).as("MaxB"), max(when($"c" < 5, $"c")).as("MaxC")) 

agg.show 
+1

あなたがここで何をしているのかを説明するといいでしょう – MZaragoza

関連する問題