8
私はFooクラスのRDDを持っています:class Foo(name : String, createDate : Date)
。 10%の年齢の別のRDDがFoo
になります。 私の最初のアイデアは、createDate
でソートし、0.1 *カウントで制限することでしたが、制限機能はありません。スパークでRDDとリミットをソートする方法は?
アイデアはありますか? Foo
を仮定
私はFooクラスのRDDを持っています:class Foo(name : String, createDate : Date)
。 10%の年齢の別のRDDがFoo
になります。 私の最初のアイデアは、createDate
でソートし、0.1 *カウントで制限することでしたが、制限機能はありません。スパークでRDDとリミットをソートする方法は?
アイデアはありますか? Foo
を仮定
は、このような場合クラスである:
import java.sql.Date
case class Foo(name: String, createDate: java.sql.Date)
普通RDDSを用いた:
import org.apache.spark.rdd.RDD
import scala.math.Ordering
val rdd: RDD[Foo] = sc
.parallelize(Seq(
("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"),
("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23")))
.toDF("name", "createDate")
.withColumn("createDate", $"createDate".cast("date"))
.as[Foo].rdd
rdd.cache()
val n = scala.math.ceil(0.1 * rdd.count).toInt
データは、ドライバメモリに収まる:
と割合は、あなたがしたい
rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime))
// Array[Foo] = Array(Foo(a,2009-11-23))
割合が比較的大きい比較的小さい:
rdd.sortBy(_.createDate.getTime).take(n)
そう
rdd
.sortBy(_.createDate.getTime)
.zipWithIndex
.filter{case (_, idx) => idx < n}
.keys
DataFrameを使用する(メモ:これは実際には制限動作のために最適なパフォーマンスではありません)。
import org.apache.spark.sql.Row
val topN = rdd.toDF.orderBy($"createDate").limit(n)
topN.show
// +----+----------+
// |name|createDate|
// +----+----------+
// | a|2009-11-23|
// +----+----------+
// Optionally recreate RDD[Foo]
topN.map{case Row(name: String, date: Date) => Foo(name, date)}
こんにちはzero323 DATAFRAMEのパフォーマンスが制限動作に最適ではないなぜあなたは本当に速い伝えることができますか?実装上の賢明なRDD上の違いとは何ですか? @ zero333 –
@ XinweiLiu私はすでにあなたの質問に対する答えを提供しました。私はそれが何が起こっているのかを願っています。 – zero323
素晴らしい答え@ zero323。しかし、私はまだ劉偉が持っているのと同じ質問があります。なぜdf.limit()が遅いのですか? – guilhermecgs