2015-09-17 17 views
5

私はSparkストリーミングを使用して一意のユーザーを数えます。私はupdateStateByKeyを使用しているので、チェックポイントディレクトリを設定する必要があります。アプリケーションを起動している間、私はまたthe example in the docとして、チェックポイントからのデータをロード:私のコードが変更された場合ここでスパークストリーミングアプリケーションを再デプロイするためのチェックポイントの設定方法は?

// Function to create and setup a new StreamingContext 
def functionToCreateContext(): StreamingContext = { 
    val ssc = new StreamingContext(...) // new context 
    val lines = ssc.socketTextStream(...) // create DStreams 
    ... 
    ssc.checkpoint(checkpointDirectory) // set checkpoint directory 
    ssc 
} 

// Get StreamingContext from checkpoint data or create a new one 
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 

が、質問ですが、私はコードを再配備する、チェックポイントがいくらロードされませんコードは変更されますか?または、自分のロジックを使用してデータを永続化し、次の実行時にロードする必要があります。

独自のロジックを使用してDStreamを保存して読み込むと、アプリケーションが失敗して再起動すると、チェックポイントディレクトリと自分のデータベースの両方からデータが読み込まれませんか?

答えて

3

チェックポイント自体にメタデータ、rdd、dag、さらにはロジックが含まれます。ロジックを変更して最後のチェックポイントから実行しようとすると、例外が発生する可能性が非常に高くなります。 独自のロジックを使用してチェックポイントとしてデータを保存する場合は、スパークアクションを実装してチェックポイントデータを任意のデータベースにプッシュし、次回の実行時にチェックポイントデータを初期RDDとしてロードする必要がありますuはupdateStateByKey APIを使用しています)、ロジックを続行します。

2

私はSparkのメールリストにこの質問をして答えを得ました。私はそれをmy blogで分析しました。ここで要約を投稿します:

方法は、チェックポイントと独自のデータ読み込みメカニズムの両方を使用することです。しかし、我々はinitalRDDupdateStateByKeyであるとデータをロードする。だから、両方の状況では、データは失われませんどちらも複製:

  1. 私たちは、コードを変更し、スパークアプリケーション、我々はシャットダウン古いスパーク優雅アプリケーションとクリーンアップチェックポイント・データを再配置すると、これだけロードされたデータがあります私たちが保存したデータ

  2. Sparkアプリケーションが失敗して再起動すると、チェックポイントからデータがロードされます。しかし、DAGのステップは保存されるので、私たち自身のデータをinitalRDDとして再びロードすることはありません。したがって、ロードされるのはチェックポイントされたデータだけです。

関連する問題