2016-04-29 9 views
1

多くのステップを実行し、最後にマイクロバッチのルックアップが行われているか、プリロードされたRDDに参加するとします。 12時間ごとにプリロードされたRDDをリフレッシュする必要があります。これどうやってするの。ストリーミング・コンテキストに関係しないことは、ストリーミングRDDのフォーム1とはどのようにして得られるのか、私の理解には再現されません。ストリーミングdstreamがいくつのパーティションにあるかにかかわらず、1つのコールだけを行う必要がありますsparkストリーミングn回のバッチの後にルックアップ非ストリームrddをリロードする方法

+0

助けがあれば分かります..... – subhankar

+0

私は考えがありますが、まずそれをテストする必要があります。 – maasg

答えて

3

これは、リロードする必要があるときに外部RDDを再作成することで可能です。それは与えられた瞬間にアクティブであるRDD参照を保持するために可変変数を定義する必要があります。 dstream.foreachRDD内で、RDD参照をリフレッシュする必要がある瞬間を確認できます。

これは、それは次のようになります方法の例です:

val stream:DStream[Int] = ??? //let's say that we have some DStream of Ints 

// Some external data as an RDD of (x,x) 
def externalData():RDD[(Int,Int)] = sparkContext.textFile(dataFile) 
    .flatMap{line => try { Some((line.toInt, line.toInt)) } catch {case ex:Throwable => None}} 
    .cache() 

// this mutable var will hold the reference to the external data RDD 
var cache:RDD[(Int,Int)] = externalData() 
// force materialization - useful for experimenting, not needed in reality 
cache.count() 
// a var to count iterations -- use to trigger the reload in this example 
var tick = 1 
// reload frequency 
val ReloadFrequency = 5 

stream.foreachRDD{ rdd => 
    if (tick == 0) { // will reload the RDD every 5 iterations 
    // unpersist the previous RDD, otherwise it will linger in memory, taking up resources. 
    cache.unpersist(false) 
    // generate a new RDD 
    cache = externalData() 
    } 
    // join the DStream RDD with our reference data, do something with it... 
    val matches = rdd.keyBy(identity).join(cache).count() 
    updateData(dataFile, (matches + 1).toInt) // so I'm adding data to the static file in order to see when the new records become alive 
    tick = (tick + 1) % ReloadFrequency 
} 
streaming.start 

このソリューションに来て前、私はRDDにpersistフラグでプレーする可能性を研究し、それはように動作しませんでした期待される。 unpersist()のように見えるのは、RDDが再び使用されたときにRDDの再マテリアライズを強制しません。

+0

私はこれを試して、それがどうなるかを見てみよう。 – subhankar

+0

@subhankar私はそれをテストした。できます。 – maasg

+0

私は、foreachRddがおそらく各RDDに対して1つのコールを行うと考えています。したがって、条件が任意のバッチで一致した場合、外部へのコールはdstreamのRDD数と同じ回数だけ発生します – subhankar

関連する問題