Sparkを使用する1.6.1 UDFが呼び出された回数を呼び出したい。私は非常に高価なUDF(1コールあたり1秒)とを持っているので、私はこれをやりたいと思っています。私のスパークジョブを必要以上に遅くしてUDFをデータフレームのレコード数よりも頻繁に呼び出すと思われます。SparkでUDFの呼び出しを呼び出す
この状況を再現できませんでしたが、UDFへの呼び出し回数が行数よりも多少異なるように見えるという簡単な例がありました。
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf
object Demo extends App {
val conf = new SparkConf().setMaster("local[4]").setAppName("Demo")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val callCounter = sc.accumulator(0)
val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value")
println(df.count) // gives 10000
val myudf = udf((d:Int) => {callCounter.add(1);d})
val res = df.withColumn("result",myudf($"value")).cache
println(res.select($"result").collect().size) // gives 10000
println(callCounter.value) // gives 9941
}
アキュムレータを使用してUDFのカウントを呼び出す方法が適切でない場合、どうすればいいですか?
注:実際のSpark-Jobでは、実際のレコード数よりも約1.7倍高いコールカウントを取得します。
私はあなたの同じコードを試しました。callcounterとして10000を印刷しました。すべてのprintlnが同じ番号を印刷しています。私はspark 2.0を使用しています。 – Shankar
:masterを 'local [ *] 'の代わりに、私は地元の印刷を正しく試してみました。私は地元の[*]で試したとき、10000の代わりに9996を印刷していました – Shankar
このような場合にアキュムレータを使用すると、既知の問題ですか?私たちが地元の[*]を使うとき、なぜそれを正しく数えないのですか? – Shankar