2016-05-19 10 views
0

NamenodeとResourceManager用にHAを備えた3ノードhadoopクラスタをセットアップしました。 NameNodeマシンの1つにSpark Job Serverもインストールしました。Spark Job Server経由でジョブを実行しています

私は、WordCount ExampleやLongPi Jobのような実行中のジョブサーバーテストの例をテストしましたが、問題なく完璧に動作します。また、Spark Job Server経由で結果を読み出すために、リモートホストからcurlコマンドを発行することもできます。

しかし、私は火花ジョブサーバー/ジャーに "火花例-1.6.0-hadoop2.6.0.jar" をアップロードし、それが失敗したSparkPiジョブを実行しようとした、また

[[email protected] lib]$ curl -d "" 'ptfhadoop01v:8090/jobs?appName=SparkPi&classPath=org.apache.spark.examples.SparkPi' 
{ 
    "status": "ERROR", 
    "result": { 
    "message": "Ask timed out on [Actor[akka://JobServer/user/context-supervisor/ece2be39-org.apache.spark.examples.SparkPi#-630965857]] after [10000 ms]", 
    "errorClass": "akka.pattern.AskTimeoutException", 
    "stack":["akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)", "akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)", "scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)", "scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)", "akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)", "akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)", "akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)", "akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)", "java.lang.Thread.run(Thread.java:745)"] 
    } 

私は手動でSparkPi.scalaジョブを/ usr/local/hadoop/spark-jobserver/job-server-tests/src/spark.jobserverに配置し、SBTを使用してパッケージを構築しようとしましたが、同じエラーが発生します。私はそれがによって働いて得ることができた

バージョン情報

[[email protected] spark.jobserver]$ sbt sbtVersion 
[info] Set current project to spark-jobserver (in build file:/usr/local/hadoop/spark-jobserver/job-server-tests/src/spark.jobserver/) 
[info] 0.13.11 

Spark Version - spark-1.6.0 
Scala Version - 2.10.4 

このエラーを取り除くと、出力を取得取得する方法上の任意の提案火花例のjarファイルから

答えて

0
package spark.jobserver 

import com.typesafe.config.{Config, ConfigFactory} 
import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import scala.math.random 

/** Computes an approximation to pi */ 
object SparkPi extends SparkJob { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[4]").setAppName("SparkPi") 
    val sc = new SparkContext(conf) 
    val config = ConfigFactory.parseString("") 
    val results = runJob(sc, config) 
    println("Pi is roughly " + results) 
} 

    override def validate(sc: SparkContext, config: Config):SparkJobValidation = { 
SparkJobValid 
    } 

    override def runJob(sc: SparkContext, config: Config): Any = { 
    val slices = if (args.length > 0) args(0).toInt else 2 
    val n = math.min(100000L * slices, Int.MaxValue).toInt 
    val count = sc.parallelize(1 until n, slices).map { i => 
    val x = random * 2 - 1 
    val y = random * 2 - 1 
    if (x*x + y*y < 1) 1 else 0 
    }.reduce(_ + _) 

(4.0 * count/n) 
    } 

} 

コードを変更してSparkJobを拡張する 清算をありがとう

関連する問題