2015-10-21 21 views
7

私は、カフカのトピックから来るメッセージに応答するJavaベースのSpark Streamingアプリケーションを開発中です。各メッセージについて、アプリケーションは何らかの処理を行い、その結果を別のカフカのトピックに書き戻します。Sparkでキャッチされない例外処理

予期しないデータ関連の問題により、RDDで動作するコードが失敗し、例外がスローされることがあります。それが起こると、必要なアクションを実行してエラーのトピックにメッセージをドロップするジェネリックハンドラが必要になります。今のところ、これらの例外はスパーク自体によってスパークのログに書き込まれます。

RDDで動作するコードブロックごとにtry-catchブロックを書き込むのではなく、これを実行する最良の方法は何ですか?

+0

トライキャッチでこれを実装することができると思います。 Sparkで今のところできなかったことがあれば、専門家が近い投票をする前に少なくともある程度の光を放つことができたら、私は感謝します。説明なしで近くの投票をしても、コミュニティには何の意味もありません。 –

+0

これを行う汎用関数を書くことができます。スパーク例外(.mapや.filterのようなトランスフォーマーはアクションによって遅延実行される)をスローできる唯一のものなので、RDDアクションの周りにラップする必要があります。 (これはScalaであると仮定します)暗黙的に何かを試すこともできますし、エラーハンドリングを強化したRDDクラスを作成して、タイプシグネチャだけでエラー処理を暗黙に強制することもできます。私は近い投票をしませんでしたが、私は "ベスト"アプローチはアプリケーションのニーズに幾分主観的だと思います。 – Rich

+0

ありがとう@リッチ。だから、基本的に言っていることは、今のところこれを処理するためにSparkに組み込みの方法がないということです。各アプリケーションはそれを処理する必要があります。あなたが答えとしてあなたのコメントを投稿できるなら、私はそれを受け入れるでしょう。 –

答えて

3

これを行う汎用関数を書くことができます。スパーク例外を発生させる唯一のもの(.map.filterなどのトランスフォーマーはアクションによって遅延実行される)がRDDアクションの周りにラップする必要があります。

(これはScalaにあると仮定します)おそらく暗黙のうちに何かを試すこともできます。 RDDを保持し、エラーを処理するクラスを作成します。ここではそれがどのように見えるかのスケッチ:あなたはfailsafeActionか何かにエラートピックメッセージを追加することができます

implicit class FailSafeRDD[T](rdd: RDD[T]) { 
    def failsafeAction[U](fn: RDD[T] => U): Try[U] = Try { 
    fn(rdd) 
    } 
} 

あなたが失敗した場合に毎回やりたいです。使用方法は次のようになります:

val rdd = ??? // Some rdd you already have 
val resultOrException = rdd.failsafeAction { r => r.count() } 

このほかにも、「最良の」アプローチはアプリケーションのニーズに多少の主観があると思います。

2

私はあなたにも、私は誰かがこの質問は意見に基づいていると言って近くの投票をキャストしていることを確認=>

dstream.foreachRDD { case rdd: RDD[String] => 
    rdd.foreach { case string: String => 
     try { 
     val kafkaProducer = ... 
     val msg = ... 
     kafkaProducer.send(msg) 
     } catch { 
     case d: DataException=> 
      val kafkaErrorProducer = ... 
      val errorMsg = ... 
      kafkaErrorProducer.send(errorMsg) 
     case t: Throwable => 
      //further error handling 
     } 
    } 
} 
関連する問題