2

を平均化するために0.5秒かかり、トップ100ユーザーのポイント数を平均化するために、次のコードを書いた:スパークは、私は、ユーザーの場所と日付時刻のCSVで約70万行のデータセットをしました100の番号

val spark = org.apache.spark.sql.SparkSession.builder 
    .appName("Test") 
    .getOrCreate 

import spark.implicits._ 

val watch = new Stopwatch() 
watch.start() 
val schema = new StructType().add("user_id", StringType).add("datetime", LongType) 

val df = spark.read.format("csv").option("header", "true").schema(schema).csv(inputFile) 
df.createOrReplaceTempView("paths") 

val pathDs = spark.sql("select user_id, min(datetime) as started, max(datetime) as finished, " + 
    "count(*) as total, max(datetime) - min(datetime) as timeDelta " + 
    "from paths group by user_id order by total desc limit 100") 

pathDs.cache() 
pathDs.collect.foreach(println) 
println(watch.elapsedTime(TimeUnit.MILLISECONDS)) 
val avgPoints = pathDs.select(avg("total")).as[Double].head() 
println(avgPoints) 
println(watch.stop()) 

私は何百万/何十億ものレコード(最終的にはテラバイトを要するかもしれません)を取って、それらを5列の100レコードに集約しています。問題は、この部分がどれくらいの時間を取るか、あるいはスピードを上げる方法ではなく、結果として得られる100のレコードで作業するときに起こることです。

SQLを使ってこれを簡単に行う方法もありますが、後でさらに処理するにはpathDSも必要です。コードはうまく動作しますが、pathDs.select(avg("total")).as[Double].head()は多くの作業を開始し、pathDSには100行しか含まれていませんが、0.5秒ほどかかることに気付きました。

これが長引く理由を知っていますか?これをスピードアップするにはどうすればよいでしょうか。具体的には、この小さなデータセットを100行しか操作しないでください。私は具体的には.cacheと.collectを使って100個のレコードをすべてローカルに持ち出す前に集計を行います(とにかくローカルで実行しています)。

私はScala 2.11でSpark 2.2をローカルで使用しています。

+0

私はここで小さなデータを処理しません - 完全なデータセットは数百ギガバイト/ TBになります。しかし、具体的な例がなぜ遅いのかを具体的に見たいのですが、.cache()を実行すると100行のデータセットがメモリに保持され、その結果が瞬時に出力されるはずだと思ったためです。 – kozyr

+0

SparkUIの高度なメトリックをチェックして、なぜそれが長引くのかを理解することをお勧めします。また、このコードを使って少なくとも1つのデータセットを実行することをお勧めします。たとえば、1000の値を使用すると、一定のオーバーヘッドと実際の計算時間(O(n))をより正確に推定できます。 –

答えて

3

スパークは大きなデータセットに対して最適化します。これは、大きなデータセットではごくわずかですが、小さなデータセットでは無視できないオーバーヘッドがしばしばあることを意味します。

あなたがavgPointsを計算し実行したときに何が起こるかを考えてみましょう:

  1. スパークは、「変換」を算出し、すなわち、それは計算が(これは選択し、平均などの一部である)何をすべきかを定義します。
  2. "head"アクションを呼び出して、sparkがあなたが作った表現ツリーを取り出し、それを物理プランに変換させます。これには、最適化と複数の可能なソリューションの比較が含まれます。この式には、キャッシュされた部分を計算する式も含まれています。実際には、これらのステップはスキップされます(スパークUIではこれを見ることができます)が、スパークが特定のケースでキャッシュデータの一部を再計算することを決定するかもしれないと考えられています。
  3. スパークは、ステージ全体のコード生成を使用して物理プランをコードにコンパイルし、このコードをシリアル化して、すべての関連するエグゼキュータに送信します。
  4. sparkが計画を作成したとき、それはデータを分割しました(おそらくグループのデフォルトである200個のパーティション)。これは、エグゼキュータの間で200のタスクを分割したことを意味します。ほとんどのパーティションには0または1の要素があるので、それらのタスクはほとんど直ちに実行されますが、スパークは200のタスクを開始する必要があります。
  5. Sparkは、200個のタスクのそれぞれの結果をバッファに送信し、それらはすべて最終的な集約を行うために1つのエグゼキュータに送信されます。最終集約タスクは、すべてのタスクが終了してデータを送信するまで開始されません。
  6. 最終的な集約が完了すると、結果がドライバに返されます。

ご覧のとおり、ネットワークの送信や開始/終了のタスク(管理が必要)など、多くの段階があります。ここでのオーバーヘッドは、実際のデータがなくても0.5秒に簡単に達することができます。

制限を1000に変更すると、データの10倍を処理しているにもかかわらず、全体的な時間の変化はほとんど見られません。

問題のサイズを小さくするためにsparkを使用するのが一般的なケースです。つまり、データが大量にあり、集約を行い、要素数を少なくすると(あなたの場合は100)、それらを収集します(例えば、あなたのケースでは、printlnを使ってforeachを実行するのではなく、集計の結果を保存して、それらを合計するだけで)オーバーヘッドを避けるためにsparkを使用するのではなく、

pathDsを計算するときにできることは、合体(1)です。これは、パーティションが1つだけであることを意味します(すべての結合は最初の段階の一部になります)。これは、より大きなサイズに制限を変更したい場合、1つの値ではなく小さい値に合体すると便利な場合があります(たとえば、10000の制限を行い、次にまだいくつかの並列性を得る)。コメントの制限の結果を基に

更新

現在、1つのパーティションですので、合体あなたはデータフレームの機能を使用する場合を除き、(それはまた、収集を行うにはない本当の理由がないことを意味する助けにはなりません結果に)。上記のプロセスは、複数のパーティションの代わりにただ1つのパーティションが使用されることを除いて、依然として正確です。

+0

最後の段落を削除することをお勧めします。特にpathDsには常に1つのパーティションがあることを考えると、これはまったく役に立たない。 – zero323

+0

@ zero323これは常に真ですか?パーティションの数は実際の制限のサイズとパーティションあたりのキーの数に依存しませんか?しかし、それはのように思えなかった - –

+0

だから私のコードでは、私は、特にこれが実際にドライバに結果をダウンさせるだろうと考えて、平均値を計算する前に pathDs.cache pathDs.collect.foreach(のprintln) をしましたそれが起こったのです。ここで収集する適切な方法は何ですか? 収集されたデータセットを収集して操作すると、ローカルのScalaオブジェクトで作業しているので、結果がはるかに高速になりますが、収集し、印刷してから、パスコードで)、それはまだ0.5秒になります。キャッシュ/収集を行うより良い方法はありますか? – kozyr

0

これを最適化する方法の1つは、データセット全体をメモリに入れた関数collectを使用して、正規のスカラ演算を使用して、おそらく1〜2ミリ秒で完了させることができますか?しかし、それは最初の場所でスパークを使用する理由に反します。

Sparkの強みは、異なるマシン上の複数のノード間で効率的に分散計算を実行することです。小規模なデータセットに対する操作は、Sparkを通過させないと、より効率的になります。あなたは100m飛行しなければならないタイミング747に似ています。今あなたはなぜ飛行機があなたをとても素早く動かすと誰もが言うと747がとても遅いのだろうと思っています。

RDDを使用したSparkでの旧バージョンの作業では、バージョン1.2 - > 1.6付近で、火花のオーバーヘッドを避けるために、mapPartitionsWithIndexのような関数を使用してパーティションデータに対して通常のスケーラ演算を実行できます。これはもちろん、その機能内ではすべてのデータが既にスパークノードレベルで分離されていました。このアプローチを使用すると、両方の世界の利益を得ることができます。

+0

私は自分のデータセットで収集を実行します。また、私も.cache()を実行します。 – kozyr

+0

また、小規模なデータセットでは動作しません。これは実稼働中のTBになります。ここで平均100個の数値が何らかの形で私の初期データセットに依存する場合は、非常に厄介な0.5秒以上の長い。私。これが私のユースケースだと仮定します。テラバイトのGPSポイントを取り、トップ100パスを獲得し、それらの平均値を計算します。 – kozyr

-2
  1. cacheにデータをロードするときの時間が余裕であるため、cacheなしで測定してください。より速くなる可能性があります。
  2. input dataparquetに入力して、同じクラスター上のalluxioなどのメモリストレージにロードできますか? 「はい」の場合はpartitionです。これはuser_idです。理想的には、新しい入力データがkafkaにプッシュされ、structured streamingジョブがalluxioまたはcassandraに追加され、別のものが選択された範囲で集計される設計アーキテクチャです。また、flinkbatchまたはstreamのいずれかを入力してください。これは一般的に高速です。あなたが入力されたデータ構造を制御することができない場合

、その後、所与の2dn段階に焦点を当て、そのようtyped aggregatesを使用してみてください:

case class Input(userId: String, time: DateTime) 

val ds = spark.read.format("csv").option("header", 
"true").schema(schema).csv(inputFile).as[Input] 

ds.groupByKey(_.userId).avg(_.time).show 

groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, 
    T] 

は、あなたのようにcsvのデータセットを作成する必要があります

型付きのパフォーマンス上のメリットのため、大きなデータセットの方がはるかに高速ですが、より小さいものではない可能性があります。

+0

事実、ここでは2つの段階があります - 1つは多くのデータを読み込み(どれくらい時間がかかりません)、それを100レコードに集約します。次に、2番目のステージでは、これらの100レコードから1つの列の平均を計算します。それは0.5秒かかりますが、これは狂った速度です。 – kozyr

+0

これは私にはパフォーマンス上のメリットがありませんし、私が見ている状況を実際に説明するとは思えません。問題のある小さなデータセットです。 – kozyr

+0

_itは、型付きのパフォーマンスの利点のために大きなデータセットの方が間違いなく速くなります - それは間違いなくありません:) – zero323

関連する問題