多くのステップを実行し、最後にマイクロバッチのルックアップが行われているか、プリロードされたRDDに参加するとします。 12時間ごとにプリロードされたRDDをリフレッシュする必要があります。これどうやってするの。ストリーミング・コンテキストに関係しないことは、ストリーミングRDDのフォーム1とはどのようにして得られるのか、私の理解には再現されません。ストリーミングdstreamがいくつのパーティションにあるかにかかわらず、1つのコールだけを行う必要がありますsparkストリーミングn回のバッチの後にルックアップ非ストリームrddをリロードする方法
1
A
答えて
3
これは、リロードする必要があるときに外部RDDを再作成することで可能です。それは与えられた瞬間にアクティブであるRDD
参照を保持するために可変変数を定義する必要があります。 dstream.foreachRDD
内で、RDD参照をリフレッシュする必要がある瞬間を確認できます。
これは、それは次のようになります方法の例です:
val stream:DStream[Int] = ??? //let's say that we have some DStream of Ints
// Some external data as an RDD of (x,x)
def externalData():RDD[(Int,Int)] = sparkContext.textFile(dataFile)
.flatMap{line => try { Some((line.toInt, line.toInt)) } catch {case ex:Throwable => None}}
.cache()
// this mutable var will hold the reference to the external data RDD
var cache:RDD[(Int,Int)] = externalData()
// force materialization - useful for experimenting, not needed in reality
cache.count()
// a var to count iterations -- use to trigger the reload in this example
var tick = 1
// reload frequency
val ReloadFrequency = 5
stream.foreachRDD{ rdd =>
if (tick == 0) { // will reload the RDD every 5 iterations
// unpersist the previous RDD, otherwise it will linger in memory, taking up resources.
cache.unpersist(false)
// generate a new RDD
cache = externalData()
}
// join the DStream RDD with our reference data, do something with it...
val matches = rdd.keyBy(identity).join(cache).count()
updateData(dataFile, (matches + 1).toInt) // so I'm adding data to the static file in order to see when the new records become alive
tick = (tick + 1) % ReloadFrequency
}
streaming.start
このソリューションに来て前、私はRDDにpersist
フラグでプレーする可能性を研究し、それはように動作しませんでした期待される。 unpersist()
のように見えるのは、RDDが再び使用されたときにRDDの再マテリアライズを強制しません。
関連する問題
- 1. 直接ストリームを使用したKafka Sparkストリーミングでコンシューマグループを指定する方法
- 2. Apache Spark RDDのコレクションを1つのRDDに変換するJava
- 3. Sparkで既存のRDDにRDDを追加するには?
- 4. RDD Aggregate in spark
- 5. Apache Spark RDDワークフロー
- 6. Sparkで明示的にRDDを実現する方法
- 7. ストリームをストリーミングする方法は?
- 8. 2つのSparkコンテキスト間でSpark RDDを共有するには?
- 9. RDDとApache Sparkのパーティション
- 10. Spark DataSetとRDDの違い
- 11. Apache Spark RDDのScalazタイプクラス
- 12. 2つのストリーム間のSparkストリーミング共有状態
- 13. Sparkで同じRDDを2回キャッシュするとどうなるのですか?
- 14. Spark RDDのフィールドを選択します
- 15. ストリーミングKmeans Spark JAVA
- 16. SparkでのRDD数の最大制限
- 17. Windows 7のSparkでディレクトリをストリーミングする
- 18. spark:マップ内でルックアップを使用する
- 19. Spark RDDの文字列置換
- 20. Apache Sparkで特定のワーカーにRDDを処理する
- 21. RDDからRDDのコレクションを作成する方法は?
- 22. RDD [(String、String)]をRDD [Array [String]]に変換する方法は?
- 23. ページのリロード後にチェックボックスの状態を確認する方法
- 24. Spark/Scala - リストのRDDでkeyByを使用する[Int、Double]
- 25. primefacesのアップロード後にjsfページをリロードする方法は?
- 26. CassandraからSparkへのストリーミングを有効にするには?
- 27. 回帰ldルックアップ
- 28. ActionMailer :: Base.deliveriesは最後のメールがN回
- 29. Spark RDDから空のパーティションを削除します
- 30. Spark RDDの各キーの最大値を取得
助けがあれば分かります..... – subhankar
私は考えがありますが、まずそれをテストする必要があります。 – maasg