私はSparkジョブを実行してデータを集約しています。私は基本的にmutable.HashMap[Zone, Double]
を含むプロファイルと呼ばれるカスタムデータ構造を持っています。私は次のコードを使用して、与えられたキー(UUID)を共有するすべてのプロファイルをマージしたい:Scope of 'spark.driver.maxResultSize'
def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
.aggregateByKey(new Profile(), 3200)(merge, merge).cache()
不思議なことに、スパークは、次のエラーで失敗します。
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
明白な解決策をインクリメントすることです"spark.driver.maxResultSize"しかし、2つのことが私を困惑させる。
- 私は、この特定のエラーや設定パラメータをグーグルで検出されたすべてのドキュメントやヘルプを1024.0
- より1024.0大きい取得一致あまりにも多く、それが戻ってドライバーに値をとる関数に影響を与えることを示しています。 (たとえば
take()
またはcollect()
)、私はドライバーには何も服用せず、HDFSから読み込み、集約して、HDFSに保存し直しています。
なぜこのエラーが発生するのですか?
あなたが私の答えを確認することができますか? – mrsrinivas
私はそれをアップヴォートするつもりですが、残念ながら私はもはやそのコード(または会社)へのアクセス権を持っていません。また、あなたの答えは#2の問題を解決するものではありません。 –