2015-09-11 9 views
7

私はSparkジョブを実行してデータを集約しています。私は基本的にmutable.HashMap[Zone, Double]を含むプロファイルと呼ばれるカスタムデータ構造を持っています。私は次のコードを使用して、与えられたキー(UUID)を共有するすべてのプロファイルをマージしたい:Scope of 'spark.driver.maxResultSize'

def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1} 
val aggregated = dailyProfiles 
    .aggregateByKey(new Profile(), 3200)(merge, merge).cache() 

不思議なことに、スパークは、次のエラーで失敗します。

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

明白な解決策をインクリメントすることです"spark.driver.maxResultSize"しかし、2つのことが私を困惑させる。

  1. 私は、この特定のエラーや設定パラメータをグーグルで検出されたすべてのドキュメントやヘルプを1024.0
  2. より1024.0大きい取得一致あまりにも多く、それが戻ってドライバーに値をとる関数に影響を与えることを示しています。 (たとえばtake()またはcollect())、私はドライバーには何も服用せず、HDFSから読み込み、集約して、HDFSに保存し直しています。

なぜこのエラーが発生するのですか?

+0

あなたが私の答えを確認することができますか? – mrsrinivas

+0

私はそれをアップヴォートするつもりですが、残念ながら私はもはやそのコード(または会社)へのアクセス権を持っていません。また、あなたの答えは#2の問題を解決するものではありません。 –

答えて

1

Yes, It's failing because The values we see in exception message are rounded off by one precision and comparison happening in bytes.

That serialized output must be more than 1024.0 MB and less than 1024.1 MB.

Apache Sparkのコードスニペットを追加しました。このエラーが発生するのは非常に面白く非常にまれです。 :)

ここでtotalResultSize > maxResultSizeは両方ともLong型であり、値はバイトで保持されます。しかしmsgは、Utils.bytesToString()から丸められた値を保持します。

//TaskSetManager.scala 
    def canFetchMoreResults(size: Long): Boolean = sched.synchronized { 
    totalResultSize += size 
    calculatedTasks += 1 
    if (maxResultSize > 0 && totalResultSize > maxResultSize) { 
     val msg = s"Total size of serialized results of ${calculatedTasks} tasks " + 
     s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " + 
     s"(${Utils.bytesToString(maxResultSize)})" 
     logError(msg) 
     abort(msg) 
     false 
    } else { 
     true 
    } 
    } 

Apache Spark 1.3 - source


//Utils.scala 
    def bytesToString(size: Long): String = { 
    val TB = 1L << 40 
    val GB = 1L << 30 
    val MB = 1L << 20 
    val KB = 1L << 10 

    val (value, unit) = { 
     if (size >= 2*TB) { 
     (size.asInstanceOf[Double]/TB, "TB") 
     } else if (size >= 2*GB) { 
     (size.asInstanceOf[Double]/GB, "GB") 
     } else if (size >= 2*MB) { 
     (size.asInstanceOf[Double]/MB, "MB") 
     } else if (size >= 2*KB) { 
     (size.asInstanceOf[Double]/KB, "KB") 
     } else { 
     (size.asInstanceOf[Double], "B") 
     } 
    } 
    "%.1f %s".formatLocal(Locale.US, value, unit) 
    } 

Apache Spark 1.3 - source