2016-10-29 10 views
4

Sparkを使用する1.6.1 UDFが呼び出された回数を呼び出したい。私は非常に高価なUDF(1コールあたり1秒)とを持っているので、私はこれをやりたいと思っています。私のスパークジョブを必要以上に遅くしてUDFをデータフレームのレコード数よりも頻繁に呼び出すと思われます。SparkでUDFの呼び出しを呼び出す

この状況を再現できませんでしたが、UDFへの呼び出し回数が行数よりも多少異なるように見えるという簡単な例がありました。

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.functions.udf 

object Demo extends App { 
    val conf = new SparkConf().setMaster("local[4]").setAppName("Demo") 
    val sc = new SparkContext(conf) 
    sc.setLogLevel("WARN") 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 


    val callCounter = sc.accumulator(0) 

    val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value") 

    println(df.count) // gives 10000 

    val myudf = udf((d:Int) => {callCounter.add(1);d}) 

    val res = df.withColumn("result",myudf($"value")).cache 

    println(res.select($"result").collect().size) // gives 10000 
    println(callCounter.value) // gives 9941 

} 

アキュムレータを使用してUDFのカウントを呼び出す方法が適切でない場合、どうすればいいですか?

注:実際のSpark-Jobでは、実際のレコード数よりも約1.7倍高いコールカウントを取得します。

+0

私はあなたの同じコードを試しました。callcounterとして10000を印刷しました。すべてのprintlnが同じ番号を印刷しています。私はspark 2.0を使用しています。 – Shankar

+0

:masterを 'local [ *] 'の代わりに、私は地元の印刷を正しく試してみました。私は地元の[*]で試したとき、10000の代わりに9996を印刷していました – Shankar

+0

このような場合にアキュムレータを使用すると、既知の問題ですか?私たちが地元の[*]を使うとき、なぜそれを正しく数えないのですか? – Shankar

答えて

1

sparkアプリケーションでは、scala.Appを拡張する代わりにmain()メソッドを定義する必要があります。 scala.Appのサブクラスが正しく動作しないことがあります。

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.functions.udf 

object Demo extends App { 
    def main(args: Array[String]): Unit = { 
     val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]") 
     val sc = new SparkContext(conf) 
     // [...] 
    } 
} 

これは問題を解決するはずです。

+0

これは問題を解決してくれてありがとうございます。しかし、私の実際のアプリケーションでは、私は主な方法(私たちはスパークノートを使用していません)がありません。たぶん私は小さな事例でこの振る舞いを再現することができますが、新しい質問をします。要するに、私は2つのデータフレームに参加し(標準の内部結合)、withColumnを使って私のudfを呼び出します。 udfは同じ行に対して複数回呼び出されます –

+0

私はスパークノートがどのように正直に働くか分かりません。私は時間があるときにそれを見てみる必要があります。 – eliasah

+0

ここに新しい質問があります:http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns –

関連する問題