2016-03-25 13 views
1

処理中に何らかの例外が発生した場合、以下のログでわかるように、Sparkはそれを3回以上再処理しようとします。その後、ステージは失敗とマークされます。私は、ステージが後でそれを分析するのに失敗したすべてのデータを取得したい、またはそれ以外で何かを実行したい。これはどうすればできますか? SparkListenersでこれを調べていますが、これはデベロッパーAPIのようです。Spark:ステージが失敗した元のデータを取得するにはどうすればよいですか?

ありがとうございました。

16/03/23 18:33:00 WARN TaskSetManager: Lost task 1.0 in stage 11.0 (TID 88, 192.168.213.53): java.lang.RuntimeException: Amit baby its exception time 
    at com.yourcompany.custom.identifier.JavaRecoverableNetworkWordCount$1.call(JavaRecoverableNetworkWordCount.java:141) 
    at com.yourcompany.custom.identifier.JavaRecoverableNetworkWordCount$1.call(JavaRecoverableNetworkWordCount.java:131) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:203) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

16/03/23 18:33:00 INFO TaskSetManager: Starting task 1.1 in stage 11.0 (TID 89, 192.168.213.53, NODE_LOCAL, 2535 bytes) 
16/03/23 18:33:00 INFO TaskSetManager: Lost task 1.1 in stage 11.0 (TID 89) on executor 192.168.213.53: java.lang.RuntimeException (Amit baby its exception time) [duplicate 1] 
16/03/23 18:33:00 INFO TaskSetManager: Starting task 1.2 in stage 11.0 (TID 90, 192.168.213.53, NODE_LOCAL, 2535 bytes) 
16/03/23 18:33:00 INFO TaskSetManager: Lost task 1.2 in stage 11.0 (TID 90) on executor 192.168.213.53: java.lang.RuntimeException (Amit baby its exception time) [duplicate 2] 
16/03/23 18:33:00 INFO TaskSetManager: Starting task 1.3 in stage 11.0 (TID 91, 192.168.213.53, NODE_LOCAL, 2535 bytes) 
16/03/23 18:33:00 INFO TaskSetManager: Lost task 1.3 in stage 11.0 (TID 91) on executor 192.168.213.53: java.lang.RuntimeException (Amit baby its exception time) [duplicate 3] 
16/03/23 18:33:00 ERROR TaskSetManager: Task 1 in stage 11.0 failed 4 times; aborting job 
16/03/23 18:33:00 INFO TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool 
16/03/23 18:33:00 INFO TaskSchedulerImpl: Cancelling stage 11 
+0

質問はなぜ投票されたのですか? –

+1

あなたはまた、話題外であることに近い票を得ました。それはおそらくダウンボォートと一緒に来ました。なぜ誰かがこの質問が話題外だと思うのか分かりません。 –

答えて

3

これはできません。タスク内で処理されるデータは、通常、その一部であるジョブより長くは存続しません。ステージが失敗すると、ジョブはもはや存在せず、データはガベージコレクションのためにアップされます。そこへの参照はないので、あなたはそれに手を差し伸べることはできません。

SparkListenerは確かDeveloperAPIですが、それはあなたがそれを使用することはできませんという意味ではありません。それはまだ公開APIです。 Sparkのバージョン間での安定性が保証されていないことを意味します。おそらく1年前からSparkListenerを使用していますが、実際は完全に安定しています。気楽にしてください。しかし、私はそれがあなたの問題を解決できるとは思わない。

しかし、それは有効で興味深い考えです。データにアクセスできることは、デバッグに大いに役立ちます。 Spark JIRAに機能要求を入れることができます。それは簡単なことではありません。 Sparkのタスクは、あなたが与えたユーザーコードだけではありません。だから、たとえタスクの入力がデバッグのために利用可能になっても、それをうまく利用する方法は自明ではありません。とにかくそれは私が考える会話の価値がある!

+0

スパークチェックポイントをこれに使用できますか? –

+1

チェックポイントは、RDD全体を書き込みます。ステージの前にすべてのRDDをチェックポイントすると、タスク入力をかなり再構築できます。しかし、これは大きなオーバーヘッドになります。失敗した場合、入力をチェックポイントに戻すことはできません。 –

+0

あなたの答えをもう一度ありがとう、コンテキストが開始された後にDAGを更新する方法はありますか?すぐに多くのものを投げて申し訳ありません。 –

関連する問題