2016-08-17 6 views
0

私は内部にRDDの他のシリーズを含むJavaRDDを作成しようとしています。JavaスパークRDDは他のRDDにありますか?

RDDMachine.foreach(machine - > startDetectionNow()) 内部で、マシンはESへのクエリを開始し、他のRDDを取得します。私はこれすべて(1200hits)を集め、リストに隠す。マシンがこのリストで作業を開始した後

まず、これは可能ですか?もしそうでなければ、私は何か違うことをしようとすることができますか?

私がやろうものをお見せしましょう:

 SparkConf conf = new SparkConf().setAppName("Algo").setMaster("local"); 
    conf.set("es.index.auto.create", "true"); 
    conf.set("es.nodes", "IP_ES"); 
    conf.set("es.port", "9200"); 
    sparkContext = new JavaSparkContext(conf); 

    MyAlgoConfig config_algo = new MyAlgoConfig(Detection.byPrevisionMerge); 

    Machine m1 = new Machine("AL-27", "IP1", config_algo); 
    Machine m2 = new Machine("AL-20", "IP2", config_algo); 
    Machine m3 = new Machine("AL-24", "IP3", config_algo); 
    Machine m4 = new Machine("AL-21", "IP4", config_algo); 

    ArrayList<Machine> Machines = new ArrayList(); 
    Machines.add(m1); 
    Machines.add(m2); 
    Machines.add(m3); 
    Machines.add(m4); 

    JavaRDD<Machine> machineRDD = sparkContext.parallelize(Machines); 

    machineRDD.foreach(machine -> machine.startDetectNow()); 

私はElasticsearchに位置データから学ばなければならない各マシンで私のアルゴリズムを起動しよう。


public boolean startDetectNow() 


    // MEGA Requete ELK 
    JavaRDD dataForLearn = Elastic.loadElasticsearch(
      Algo.sparkContext 
      , "logstash-*/Collector" 
      , Elastic.req_AvgOfCall(
        getIP() 
        , "hour" 
        , "2016-04-16T00:00:00" 
        , "2016-06-10T00:00:00")); 

    JavaRDD<Hit> RDD_hits = Elastic.mapToHit(dataForLearn); 
    List<Hit> hits = Elastic.RddToListHits(RDD_hits); 

だから私はすべての「機械」で、クエリのすべてのデータを取得しよう。 私の質問です:それはスパークでこれを行うことは可能ですか?それとも別の方法で? 私はSparkで起動します。コードが2番目のRDDの周りにあるときにロックのようなものになるのは継ぎ目です。

、エラーメッセージは次のとおりです。

16/08/17午前0時17分13秒INFO SparkContext:ジョブ開始:Elastic.java:94 16/08/17午前0時17分13秒で収集INFO DAGScheduler:1つの出力パーティションでジョブ1(Elastic.java:94で収集)を取得 16/08/17 00:17:13 INFO DAGScheduler:最終ステージ:ResultStage 1(Elastic.java:94で収集) 16/08/17 00:17:13情報DAGScheduler:最終段階の親:リスト() 16/08/17 00:17:13情報DAGScheduler:逃した親:リスト() 16/08/17 00:17:13 INFO DAGScheduler:ResultStage 1を送信する(MapPartitionsRDD [4]、Elastic.java:106のマップ)、親が不足していないこと 16/08/17 00:17:13情報MemoryStore:ブロードキャスト1をメモリ内の値としてブロックする(推定サイズ4.3 KB、無料7.7 KB) 16/08/17 00:17:13情報MemoryStore: BlockServerInfo:localhost:46356のメモリにbroadcast_1_piece0を追加しました(サイズ:2.5 KB、空き容量:511.1 MB) 16/08/17 00:17:13 INFO SparkContext:DAGScheduler.scalaのブロードキャストからブロードキャスト1を作成しました:1006 16/08/17 00:17:13 INFO DAGScheduler:ResultStage 1から1つの不足しているタスクを送信しています(MapPartitionsRDD [4 ] Elastic.java:106のマップ) 16/08/17 00:17:13 INFO TaskSchedulerImpl:タスク1でタスク1を追加する ^ C16/08/17 00:17:22 INFO SparkContext:stop()を呼び出すシャットダウンからOK 16/08/17 00:17:22 INFO SparkUI:Spark Web UIを停止しました。http://192.168.10.23:4040 16/08/17 00:17:22 INFO DAGScheduler:ResultStage 0(GuardConnect.java:60のforeach)が10,292秒で失敗しました。 16/08/17 00:17:22 INFO DAGScheduler:ジョブ0が失敗しました:GuardConnect.java:60のforeachが10,470974秒かかる スレッド "main"の例外org.apache.spark.SparkException:ジョブ0がキャンセルされました。 (DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply(0) DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cl eanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop。 で:org.apache.spark.scheduler.DAGScheduler.stop(1581 DAGScheduler.scala)で onStop(DAGScheduler.scala:1658)org.apache.spark.util.EventLoop.stopで無料 (84 EventLoop.scala)組織でorg.apache.spark.util.Utils $ .tryLogNonFatalError(Utils.scala 1229) で :org.apache.spark.SparkContext $$ anonfun $ $ $ 9.apply MCV $ SP(1740 SparkContext.scala)を停止.apache.spark.SparkContext.stop(SparkContext.scala:1739) org.apache.spark.SparkContext $$ anonfun $ 3.apply $ MCV $ SP(SparkContext.scala:596)で org.apache.spark.utilで.SparkShutdownHook.run(ShutdownHookManager.scala:267)org.apache.spark.util.SparkShutdownHookManager $$ anonfun runAll $ $ 1 $$ anonfunで $ $ $ MCV SP 1.apply $ $ $ MCV SPを適用します(ShutdownHookManager.scala:239 org.apacheで) 。 spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply_schema $ MCV $ SP $ 1.apply(ShutdownHookManager.scala:239)org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfunで $ apply_schema $ MCV $ SP $ 1.apply(ShutdownHookManager.scala:239) org.apache.spark.util.SparkShutdownHookManagerでorg.apache.spark.util.Utils $ .logUncaughtExceptions(Utils.scala 1765) で$$ anonfun $ runAll $ 1.apply $ MCV $ SP(ShutdownHookManager.scala:239)org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.applyで (ShutdownHookManager.scala:239) org.apache.sparkで.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply(ShutdownHookManager.scala:239)scala.util.Try $ .applyで (Try.scala:161) org.apache.spark.util.SparkShutdownHookManager.runAllで( ShutdownHookManager.scala:2 39) org.apache.spark.util.SparkShutdownHookManager $$アノン$ 2.run(ShutdownHookManager.scala時:218) org.apache.hadoop.util.ShutdownHookManagerの$ 1.run(ShutdownHookManager.java:54) の組織でorg.apache.spark.SparkContext.runJobで (SparkContext.scala:1832):.apache.spark.scheduler.DAGScheduler.runJob(620 DAGScheduler.scala) org.apache.spark.SparkContext.runJobで(SparkContext.scala 1845)org.apache.spark.SparkContext.runJob(SparkContext.scalaで :org.apache.spark.SparkContext.runJob(SparkContext.scalaで1858) :1929) org.apache.spark.rdd.RDDで$$ anonfun $ foreachの$ 1.apply(RDD.scala:912)org.apache.spark.rdd.RDD $$ anonfun $ foreachの$ 1.applyで (RDD.scala:910) org.apachでe.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150):org.apache.spark.rdd.RDDで org.apache.spark.rdd.RDDOperationScope $ .withScope(111 RDDOperationScope.scala)で 。 org.apache.spark.api.java.JavaRDDLike $ class.foreachで(JavaRDDLike.scala:org.apache.spark.rdd.RDD.foreach(910 RDD.scala)で :スコープ(316 RDD.scala)とsun.reflect.NativeMethodAccessorImplでcom.seigneurin.spark.GuardConnect.main(GuardConnect.java:60) における46) :org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scalaにおける332) 。 sun.reflect.DelegatingMethodAccessorImpl.invokeで sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) (DelegatingMethodAccessでinvoke0(ネイティブメソッド) orImpl.java:43)java.lang.reflect.Method.invoke(Method.java:498で )で org.apache.spark.deploy.SparkSubmit $ .orgのapacheの$ $ $は$ SparkSubmit $$ runMainを展開キック( SparkSubmit.scala:org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1での731) (SparkSubmit.scala:org.apache.spark.deploy.SparkSubmit $ .submitにおける181) (SparkSubmit.scala:206) org.apache.spark.deploy.SparkSubmit.mainで(SparkSubmit.scala) 08/16/17午後12時17分22秒ERROR:org.apache.spark.deploy.SparkSubmit $ .main(121 SparkSubmit.scala)でLiveListenerBus:SparkListenerBusが停止alreadyloggedinています!ドロップイベントSparkListenerStageCompleted([email protected]) 16/08/17午後12時17分22秒INFO DAGScheduler:結果田下1(弾性に集まります。java:94)が9,301秒で失敗しました 16/08/17 00:17:22エラーLiveListenerBus:SparkListenerBusがすでに停止しています!イベントを落とすSparkListenerStageCompleted([email protected]) 16/08/17 00:17:22エラーLiveListenerBus:SparkListenerBusが既に停止しています! SparkListenerJobEnd(0,1471385842813、JobFailed(org.apache.spark.SparkException:SparkContextがシャットダウンされたためジョブ0がキャンセルされました)) 16/08/17 00:17:22 INFO DAGScheduler:ジョブ1が失敗しました:Elasticで収集します。 java:94、9317650 s 16/08/17 00:17:22エラーExecutor:ステージ0.0(TID 0)のタスク0.0の例外 org.apache.spark.SparkException:SparkContextが終了したためジョブ1がキャンセルされましたorg.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.applyで (DAGScheduler.scala:org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply(806 DAGScheduler.scala)で ダウン:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.s (DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache。 spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext $$ anonfun $ stop $ 9.apply $ mcV $ sp(SparkContext.scala:1740) at org.apache.spark。 util.Utils $ .tryLogNonFatalError(Utils.scala:1229) at org.apache.spark.SparkContext.stop(SparkContext.scala:1739) at org.apache.spark.SparkContext $$ anonfun $ 3.apply $ mcV $ sp (ShutdownHookManager.scala:267) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ appl(SparkContext.scala:596) y $ mcV $ sp $ 1.apply $ mcV $ sp(ShutdownHookManager.scala:239) 、org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(ShutdownHookManager .scala:239) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util .Utils $ .logUncaughtExceptions(Utils.scala:1765) 、org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply $ mcV $ sp(ShutdownHookManager.scala:239) at org.apache.spark。 (ShutdownHookManager.scala:239) 、org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply(ShutdownHookManager.scala:239) をscala.utilにしてください。試してみてください$ .apply(Try.scala:161)(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager $$ anon $ 2.run(ShutdownHookManager.scala:218) at org.apache。 org.apache.spark.SparkContext.runJobで(SparkContext.scala:org.apache.spark.scheduler.DAGScheduler.runJob(620 DAGScheduler.scala)での$ 1.run(ShutdownHookManager.java:54) hadoop.util.ShutdownHookManager :1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob (SparkContext.scala:1929) at org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply(RDD.scala:927) at org.apac hel.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD。 (RDD.scala:926) at org.apache.spark.api.java.JavaRDDLike $ class.collect(JavaRDDLike.scala:RDD.scala:316) によるorg.apache.spark.rdd.RDD.collect(RDD.scala: 339) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:46) at com.seigneurin.spark.Elastic。RddToListHits(Elastic.java:94)com.seigneurinでcom.seigneurin.spark.OXO.startDetectNow(OXO.java:148)でcom.seigneurin.spark.OXO.prepareDataAndLearn(OXO.java:126) で 。 spark.GuardConnect.lambda $ main $ 1282d8df $ 1(GuardConnect.java:60) at org.apache.spark.api.java.JavaRDDLike $$ anonfun $ foreach $ 1.apply(JavaRDDLike.scala:332) at org.apache .spark.api.java.JavaRDDLike $$ anonfun $ foreach $ 1.apply(JavaRDDLike.scala:332) at scala.collection.Iterator $ class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator .foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1 $$ anonfun $ apply $ 32.Apply(RDD.scala:912) at org.apache.spark.rdd .RDD $$ anonfun $ foreach $ 1 $$ anonf $ 32.apply(RDD.scala:912) at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89 ) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/08/17 00:17:22エラーLiveListenerBus:SparkListenerBusはすでに停止しています! (SparkContextがシャットダウンされたためジョブ1がキャンセルされました) 16/08/17 00:17:22 INFO MapOutputTrackerMasterEndpoint:MapOutputTrackerMasterEndpoint stopped! 16/08/17 00:17:22情報メモリストア:メモリストアがクリアされました 16/08/17 00:17:22情報BlockManager:BlockManagerが停止しました 16/08/17 00:17:22 INFO BlockManagerMaster:BlockManagerMaster stopped 16/08/17 00:17:22 INFO OutputCommitCoordinator $ OutputCommitCoordinatorEndpoint:OutputCommitCoordinatorが停止しました。 16/08/17 00:17:22 INFO RemoteActorRefProvider $ RemotingTerminator:リモートデーモンをシャットダウンします。 16/08/17 00:17:22 INFO RemoteActorRefProvider $ RemotingTerminator:リモートデーモンをシャットダウンします。リモートトランスポートをフラッシュして処理を進めます。 16/08/17 00:17:22 INFO TaskSetManager:ステージ1.0でタスク0.0を開始しました(TID 1、ローカルホスト、パーティション0、ANY、6751バイト) 16/08/17 00:17:22エラー受信トレイ:無視するエラー java.util.concurrent.RejectedExecutionException:[email protected]から拒否されたタスク[email protected] [終了、プールサイズ= 0、アクティブスレッド= 0、キューに登録されたタスク= 0、完了したタスク= 1] at java.util.concurrent.ThreadPoolExecutor $ AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util .concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.apache.spec.executor.Executor.launchTask(Executor.scala:128) at org.apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ reviveOffers $ 1.apply(LocalBackend.scala:86) at org.apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ reviveOffers $ 1.apply( LocalBackend.scala:84) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:84) at org。 apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ receive $ 1.applyOrElse(LocalBackend.scala:69) at org.apache.spark.rpc.netty.Inbox $$ anonfun $ process $ 1.apply $ mcV $ sp Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher $ Me ssageLoop。実行(Dispatcher.scala:215)java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)は、Javaで でjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) で 。 lang.Thread.run(Thread.java:745) 16/08/17 00:17:22 INF SparkContext:SparkContextを正常に停止しました 16/08/17 00:17:22情報ShutdownHookManager:シャットダウンフック 16/08/17 00:17:22情報ShutdownHookManager:ディレクトリ/ tmp/spark-8bf65e78-a916-4cc0-b4d1-1b0ec9a07157の削除 16/08/17 00:17:22 INFO RemoteActorRefProvider $ RemotingTerminator:リモートシャットダウン。 16/08/17〇時17分22秒INFO ShutdownHookManager:削除するディレクトリを/ tmp /火花8bf65e78-a916-4cc0-b4d1-1b0ec9a07157/httpdの-6d3aeb80-808c-4749-8f8b-ac9341f990a7

感謝場合あなたは私にいくつかの助言を与えることができます。

+0

私は、内部例外が必要です。これはすべてあなたの 'foreach'に問題があることを伝えています。 –

+0

RDD の直後に私はしばらく(1)を持っていた可能性がありますか?多分私はRDDで作業をスレッド化できると思っていました。私はエラー –

+0

の完全なメッセージを追加するRDDのRDDは本当に意味をなさないが、はい、コンパイルするためにコンパイラを騙す方法があります。 – GameOfThrows

答えて

0

RDD内にRDDを作成することはできません。RDDの種類は何でも可能です。 これが最初のルールです。これは、RDDがデータを指す抽象であるためです。

+0

okありがとうございます;) –

関連する問題