2017-02-01 6 views
0

私はCloudera 5.8.3でSpark 1.6.0を使用します。
私はSpark Streamingでは、バッチ処理が完了したことを検出する方法はありますか?

val stream = KafkaUtils.createDirectStream[...](...) 
val mappedStream = stream.transform { ... }.map { ... } 
mappedStream.foreachRDD { ... } 
mappedStream.foreachRDD { ... } 
mappedStream.map { ... }.foreachRDD { ... } 

が最後に実行することが保証されており、上記のforeachRDD sが実行を終了した場合にのみされ、最後のforeachRDDを登録する方法はあり、その上に定義された変換のDStream対象とたくさん持っていますか?
つまり、Spark UIにジョブが完了したことが表示されたとき、それが軽量関数を実行したいときです。

私はこれを実現するためのAPIがありますか?

おかげで

+0

'mappedStream'で実行されたすべてのロジックが、最後の行のマップを含む1つのforeachRDDで実行されるように、コードを再構成できるかどうかだけに疑問がありますか?それはちょうど 'foreachRDD'は変換ではありません。 DStreamのマップは、foreachRDDの各rddのマップを実行するのと同じですか?それが理にかなっていれば? – ImDarrenG

+0

一日の終わりに、私はおそらく単一の 'foreachRDD'に絞り込み、'最後の 'foreachRDD'は実装するのが非常に簡単です。しかし、私はそれらのために定義された異なるデータパイプラインを持つ複数のソースを持っています。この実装には、設定を読み込んでそれに応じてソースを生成するメインクラスがあり(カフカソース)、それぞれのソースは異なるデータパイプライン、つまりストリームの処理方法を定義するクラスを通過します。パイプラインクラスで定義されている変換に関係なく、メインクラスの終了バッチを検出したいと思います。 –

答えて

4

ストリーミングリスナーを使用すると、あなたのための問題を解決する必要があります:

を(申し訳ありません、それはJavaの例です)

ssc.addStreamingListener(new JobListener()); 

// ... 

class JobListener implements StreamingListener { 

@Override 
    public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) { 

     System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo().totalDelay().get().toString() + " ms"); 

    } 

    /* 

    snipped other methods 

    */ 


} 

https://gist.github.com/akhld/b10dc491aad1a2007183

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-streaming/spark-streaming-streaminglisteners.html

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener

+0

まさに私が探していたもの、ありがとう! –

+0

心配はいりません。将来の参照のためにどのように乗っていくのか聞くことに興味がありますか?余分な分がある場合にのみ:o) – ImDarrenG

関連する問題