私はCloudera 5.8.3でSpark 1.6.0を使用します。
私はSpark Streamingでは、バッチ処理が完了したことを検出する方法はありますか?
val stream = KafkaUtils.createDirectStream[...](...)
val mappedStream = stream.transform { ... }.map { ... }
mappedStream.foreachRDD { ... }
mappedStream.foreachRDD { ... }
mappedStream.map { ... }.foreachRDD { ... }
が最後に実行することが保証されており、上記のforeachRDD
sが実行を終了した場合にのみされ、最後のforeachRDD
を登録する方法はあり、その上に定義された変換のDStream
対象とたくさん持っていますか?
つまり、Spark UIにジョブが完了したことが表示されたとき、それが軽量関数を実行したいときです。
私はこれを実現するためのAPIがありますか?
おかげで
'mappedStream'で実行されたすべてのロジックが、最後の行のマップを含む1つのforeachRDDで実行されるように、コードを再構成できるかどうかだけに疑問がありますか?それはちょうど 'foreachRDD'は変換ではありません。 DStreamのマップは、foreachRDDの各rddのマップを実行するのと同じですか?それが理にかなっていれば? – ImDarrenG
一日の終わりに、私はおそらく単一の 'foreachRDD'に絞り込み、'最後の 'foreachRDD'は実装するのが非常に簡単です。しかし、私はそれらのために定義された異なるデータパイプラインを持つ複数のソースを持っています。この実装には、設定を読み込んでそれに応じてソースを生成するメインクラスがあり(カフカソース)、それぞれのソースは異なるデータパイプライン、つまりストリームの処理方法を定義するクラスを通過します。パイプラインクラスで定義されている変換に関係なく、メインクラスの終了バッチを検出したいと思います。 –