2016-05-02 12 views
1

私はカフカのトピックから受信して処理が完了した後にストリームの処理を終了したいと思います。停止は、(awaitTerminationOrTimeout)のような特定の時間であってはなりません。トピックが消耗した後にsparkstreamingcontextを停止する方法はありますか? Dstream [T]をT値と比較して制御フローを指示する方法はありますか?stop sparkストリーミングコンテクストkafkaDirectStream

答えて

0

私はisEmptyがtrueを返すと、ストリームが空の場合headOptionはになしてはならないことを約80%cetainです。

+0

rdd.isEmptyはtrueを返しません行われにreceieved 最新のと同じです。しかし、sparkStreamingContext.stopでアプリケーションを停止することはありません。 –

+0

'future({()=> while(!kafkaMessageStream.isEmpty){ Thread.sleep(100)} sparkTreamingContext。 (真) }) ' これは100ミリ秒ごとにメッセージがあるかどうかをチェックし、メッセージがない場合は停止します。 –

+0

偽陽性を得ることができます。たとえば、ブローカがバッチ間隔よりも長い時間忙しくなった場合などです。 –

0

ストリームの読み込みを開始する前に、トピック内のすべてのパーティションの最新のオフセットを取得してから、受信したオフセットがいつ取得されたかを確認するのが最善の方法です。トピックのオフセットを取得する方法については、previous answerを参照してください。

フローはビーイングを終わる:

  1. あなたが早い を返すOffsetRequestを行い、各パーティションについてSimpleConsumer
  2. を作成し、各ブローカのトピックのパーティションやブローカー
  3. を取得最新のオフセット(前回の回答を参照)
  4. メッセージを読むときに、 の受信メッセージのオフセットを確認して、パーティションの既知の最後のオフセットである
  5. すべてのオフセットは、パーティションごとに受信したら、あなたのOffsetRequestあなたがkafkaStream.foreachRDDの