2015-10-05 4 views
8

私はFooクラスのRDDを持っています:class Foo(name : String, createDate : Date)。 10%の年齢の別のRDDがFooになります。 私の最初のアイデアは、createDateでソートし、0.1 *カウントで制限することでしたが、制限機能はありません。スパークでRDDとリミットをソートする方法は?

アイデアはありますか? Fooを仮定

答えて

14

は、このような場合クラスである:

import java.sql.Date 
case class Foo(name: String, createDate: java.sql.Date) 
  1. 普通RDDSを用いた:

    import org.apache.spark.rdd.RDD 
    import scala.math.Ordering 
    
    val rdd: RDD[Foo] = sc 
        .parallelize(Seq(
        ("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"), 
        ("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23"))) 
        .toDF("name", "createDate") 
        .withColumn("createDate", $"createDate".cast("date")) 
        .as[Foo].rdd 
    
    rdd.cache() 
    val n = scala.math.ceil(0.1 * rdd.count).toInt 
    
    • データは、ドライバメモリに収まる:

        あなたが望む
      • と割合は、あなたがしたい

        rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime)) 
        // Array[Foo] = Array(Foo(a,2009-11-23)) 
        
      • 割合が比較的大きい比較的小さい:

        rdd.sortBy(_.createDate.getTime).take(n) 
        
    • そう

      rdd 
          .sortBy(_.createDate.getTime) 
          .zipWithIndex 
          .filter{case (_, idx) => idx < n} 
          .keys 
      
  2. DataFrameを使用する(メモ:これは実際には制限動作のために最適なパフォーマンスではありません)。

    import org.apache.spark.sql.Row 
    
    val topN = rdd.toDF.orderBy($"createDate").limit(n) 
    topN.show 
    
    // +----+----------+ 
    // |name|createDate| 
    // +----+----------+ 
    // | a|2009-11-23| 
    // +----+----------+ 
    
    
    // Optionally recreate RDD[Foo] 
    topN.map{case Row(name: String, date: Date) => Foo(name, date)} 
    
+1

こんにちはzero323 DATAFRAMEのパフォーマンスが制限動作に最適ではないなぜあなたは本当に速い伝えることができますか?実装上の賢明なRDD上の違いとは何ですか? @ zero333 –

+0

@ XinweiLiu私はすでにあなたの質問に対する答えを提供しました。私はそれが何が起こっているのかを願っています。 – zero323

+1

素晴らしい答え@ zero323。しかし、私はまだ劉偉が持っているのと同じ質問があります。なぜdf.limit()が遅いのですか? – guilhermecgs

関連する問題