私はカフカのトピックから受信して処理が完了した後にストリームの処理を終了したいと思います。停止は、(awaitTerminationOrTimeout)のような特定の時間であってはなりません。トピックが消耗した後にsparkstreamingcontextを停止する方法はありますか? Dstream [T]をT値と比較して制御フローを指示する方法はありますか?stop sparkストリーミングコンテクストkafkaDirectStream
1
A
答えて
0
私はisEmpty
がtrueを返すと、ストリームが空の場合headOption
はになしてはならないことを約80%cetainです。
0
ストリームの読み込みを開始する前に、トピック内のすべてのパーティションの最新のオフセットを取得してから、受信したオフセットがいつ取得されたかを確認するのが最善の方法です。トピックのオフセットを取得する方法については、previous answerを参照してください。
フローはビーイングを終わる:
- あなたが早い を返す
OffsetRequest
を行い、各パーティションについてSimpleConsumer
- を作成し、各ブローカのトピックのパーティションやブローカー
- を取得最新のオフセット(前回の回答を参照)
- メッセージを読むときに、 の受信メッセージのオフセットを確認して、パーティションの既知の最後のオフセットである
- すべてのオフセットは、パーティションごとに受信したら、あなたの
OffsetRequest
あなたがkafkaStream.foreachRDDの
関連する問題
- 1. docker stop spark containerが終了しない
- 2. Stop setInterval
- 3. htaccess stop processing rules
- 4. ProgressDialog cant stop
- 5. Start stop JMSメッセージリスナー
- 6. stop tableviewcontrollerスクロール
- 7. Scanner + System.in stop condition
- 8. ゴースレッド - STOP実行
- 9. regex syntax stop search
- 10. magento stop checkout
- 11. slideToggle issue with stop()
- 12. jQueryのスライドメニュー - .stop()
- 13. jQuery Stop Animation
- 14. java swing stop function
- 15. jQuery stopクリックスパム
- 16. Adobe Air stop();
- 17. Jquery .Stop()slowdown
- 18. Android force stop apps
- 19. Gps location listener stop
- 20. C#Stop BackgroundWorker
- 21. onPause/Resume/Start/Stop order
- 22. Java split beforeとstop
- 23. スレッド内のStop()メソッド?
- 24. BlackBerryのストリームオーディオ/ STOPストリーム
- 25. jQueryアニメーションキューと.stop(真)
- 26. パッケージtm stop-wordパラメータ
- 27. cocos2d start/stop runningシーン
- 28. MediaPlayerクラスのstopメソッド
- 29. JQuery hover stop()。fadeIn()issue
- 30. python stop selenium standalone server
rdd.isEmptyはtrueを返しません行われにreceieved 最新のと同じです。しかし、sparkStreamingContext.stopでアプリケーションを停止することはありません。 –
'future({()=> while(!kafkaMessageStream.isEmpty){ Thread.sleep(100)} sparkTreamingContext。 (真) }) ' これは100ミリ秒ごとにメッセージがあるかどうかをチェックし、メッセージがない場合は停止します。 –
偽陽性を得ることができます。たとえば、ブローカがバッチ間隔よりも長い時間忙しくなった場合などです。 –