私はSparkストリーミングを使用して一意のユーザーを数えます。私はupdateStateByKey
を使用しているので、チェックポイントディレクトリを設定する必要があります。アプリケーションを起動している間、私はまたthe example in the docとして、チェックポイントからのデータをロード:私のコードが変更された場合ここでスパークストリーミングアプリケーションを再デプロイするためのチェックポイントの設定方法は?
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
が、質問ですが、私はコードを再配備する、チェックポイントがいくらロードされませんコードは変更されますか?または、自分のロジックを使用してデータを永続化し、次の実行時にロードする必要があります。
独自のロジックを使用してDStreamを保存して読み込むと、アプリケーションが失敗して再起動すると、チェックポイントディレクトリと自分のデータベースの両方からデータが読み込まれませんか?