を平均化するために0.5秒かかり、トップ100ユーザーのポイント数を平均化するために、次のコードを書いた:スパークは、私は、ユーザーの場所と日付時刻のCSVで約70万行のデータセットをしました100の番号
val spark = org.apache.spark.sql.SparkSession.builder
.appName("Test")
.getOrCreate
import spark.implicits._
val watch = new Stopwatch()
watch.start()
val schema = new StructType().add("user_id", StringType).add("datetime", LongType)
val df = spark.read.format("csv").option("header", "true").schema(schema).csv(inputFile)
df.createOrReplaceTempView("paths")
val pathDs = spark.sql("select user_id, min(datetime) as started, max(datetime) as finished, " +
"count(*) as total, max(datetime) - min(datetime) as timeDelta " +
"from paths group by user_id order by total desc limit 100")
pathDs.cache()
pathDs.collect.foreach(println)
println(watch.elapsedTime(TimeUnit.MILLISECONDS))
val avgPoints = pathDs.select(avg("total")).as[Double].head()
println(avgPoints)
println(watch.stop())
私は何百万/何十億ものレコード(最終的にはテラバイトを要するかもしれません)を取って、それらを5列の100レコードに集約しています。問題は、この部分がどれくらいの時間を取るか、あるいはスピードを上げる方法ではなく、結果として得られる100のレコードで作業するときに起こることです。
SQLを使ってこれを簡単に行う方法もありますが、後でさらに処理するにはpathDSも必要です。コードはうまく動作しますが、pathDs.select(avg("total")).as[Double].head()
は多くの作業を開始し、pathDSには100行しか含まれていませんが、0.5秒ほどかかることに気付きました。
これが長引く理由を知っていますか?これをスピードアップするにはどうすればよいでしょうか。具体的には、この小さなデータセットを100行しか操作しないでください。私は具体的には.cacheと.collectを使って100個のレコードをすべてローカルに持ち出す前に集計を行います(とにかくローカルで実行しています)。
私はScala 2.11でSpark 2.2をローカルで使用しています。
私はここで小さなデータを処理しません - 完全なデータセットは数百ギガバイト/ TBになります。しかし、具体的な例がなぜ遅いのかを具体的に見たいのですが、.cache()を実行すると100行のデータセットがメモリに保持され、その結果が瞬時に出力されるはずだと思ったためです。 – kozyr
SparkUIの高度なメトリックをチェックして、なぜそれが長引くのかを理解することをお勧めします。また、このコードを使って少なくとも1つのデータセットを実行することをお勧めします。たとえば、1000の値を使用すると、一定のオーバーヘッドと実際の計算時間(O(n))をより正確に推定できます。 –