2016-06-12 5 views
4

私はsc.setCheckpointDirメソッドでチェックポイントディレクトリを設定しました。私はその後、RDDのチェックポイント作成したRDD.checkpoint()はチェックポイントディレクトリにデータを格納しません

/checkpointDirectory/ 

rdd.checkpoint()とディレクトリ内を、私は今、文字のランダムな文字列の形で、新しいチェックポイントを表す新しいディレクトリを参照してください。そのディレクトリの中には何もありません。

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty] 

は、その後カップルの変換を行った後、私は再びrdd.checkpoint()を実行し、何もその最近作成したディレクトリ内にまだ存在しない

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty] 

私はcheckpoint()間違っを使用していますか?そのディレクトリで正しく動作することを知るためには、どうすればいいでしょうか?

答えて

4

checkpointは、Sparkの他の多くの操作と同様に、怠け者です。データは、特定のRDDがマテリアライズされている場合にのみチェックポイントされます。表示される空のディレクトリは、アプリケーション固有のチェックポイントディレクトリです。

チェックポイントを実行するには、対応するRDDを評価するアクションをトリガーする必要があります。例(ローカルモード)で:

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated] 
## | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 [Memory Serialized 1x Replicated] 

とアクションの後に::

import glob 
import os 
from urllib.parse import urlparse 

sc.setCheckpointDir("/tmp/checkpoints/") 
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*") 

rdd = sc.range(1000, 10) 
plus_one = rdd.map(lambda x: x + 1) 
plus_one.cache() 
plus_one.checkpoint() # No checkpoint dir here yet 

[os.path.split(x)[-1] for x in glob.glob(ch_dir)] 
## [] 
plus_one.isCheckpointed() 
## False 

# After count is executed you'll see rdd specific checkpoint dir 
plus_one.count() 
[os.path.split(x)[-1] for x in glob.glob(ch_dir)] 
## ['rdd-1'] 
plus_one.isCheckpointed() 
## True 

また、前のデバッグ文字列を解析することができ

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated] 
## |  CachedPartitions: 8; MemorySize: 168.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B 
## | ReliableCheckpointRDD[3] at count at <ipython-input-16-96e746c56973>:1 [Memory Serialized 1x Replicated] 

RDDが計算されます前に、あなたが見ることができるように最初からcountの後にReliableCheckpointRDDが表示されます。

0

チェックポイントは定期的に実行されます(チェックポイントの期間)。チェックポイントの持続時間をあなたのスパークコンテキストに伝える必要があります。

関連する問題