私はKafka Streamsアプリケーションのバージョン - 0.11を使用して、いくつかのトピックからデータを取り出し、データを結合して別のトピックに配置します。Kafkaストリームスタートアップの問題 - org.apache.kafka.streams.errors.LockException
カフカ構成:
5 kafka brokers - version 0.11
Kafka Topics - 15 partitions and 3 replication factor.
レコードのほとんど何百万人が消費されている/毎時で生産しました。私がダウンして任意のカフカブローカーを取るたびに、それは例外の下にスロー:私は問題を解決するのに役立つかもしれないストリームをcleaningUp少数JIRA課題で読んだことがある
org.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)
at org.apache.kafka.streams.processor.internals.StandbyTask.<init>(StandbyTask.java:62)
at org.apache.kafka.streams.processor.internals.StreamThread.createStandbyTask(StreamThread.java:1325)
at org.apache.kafka.streams.processor.internals.StreamThread.access$2400(StreamThread.java:73)
at org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:313)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
at org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:1366)
at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:73)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:185)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
。しかし、私たちがKafkaストリームアプリケーションを開始するたびにストリームを浄化するのは正しい解決策ですか、パッチですか?また、ストリームクリーンアップはアプリケーションの起動を遅らせるでしょうか?
注:私はstreams.cleanUpを(呼び出す必要があります)streams.start()を呼び出す前に、私はorg.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10
を見てカフカストリームアプリケーションに
ただし、この警告を解決するには時間がかかります。このための再試行回数を減らす設定がありますか? –
そして理想的に私はcleanup()を呼び出さなければならない? –
いいえ、あなたはしないでください!それは不必要なオーバーヘッドの多くを導入するだろう!ドキュメントの[ヒント]ボックスを比較してください:https://docs.confluent.io/current/streams/developer-guide.html#step-2-reset-the-local-environments-of-your-application-instances –