0

私はKafka Streamsアプリケーションのバージョン - 0.11を使用して、いくつかのトピックからデータを取り出し、データを結合して別のトピックに配置します。Kafkaストリームスタートアップの問題 - org.apache.kafka.streams.errors.LockException

カフカ構成:

5 kafka brokers - version 0.11 
Kafka Topics - 15 partitions and 3 replication factor. 

レコードのほとんど何百万人が消費されている/毎時で生産しました。私がダウンして任意のカフカブローカーを取るたびに、それは例外の下にスロー:私は問題を解決するのに役立つかもしれないストリームをcleaningUp少数JIRA課題で読んだことがある

org.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10 
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99) 
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80) 
    at org.apache.kafka.streams.processor.internals.StandbyTask.<init>(StandbyTask.java:62) 
    at org.apache.kafka.streams.processor.internals.StreamThread.createStandbyTask(StreamThread.java:1325) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$2400(StreamThread.java:73) 
    at org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:313) 
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:1366) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:73) 
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:185) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) 

。しかし、私たちがKafkaストリームアプリケーションを開始するたびにストリームを浄化するのは正しい解決策ですか、パッチですか?また、ストリームクリーンアップはアプリケーションの起動を遅らせるでしょうか?

注:私はstreams.cleanUpを(呼び出す必要があります)streams.start()を呼び出す前に、私はorg.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10を見てカフカストリームアプリケーションに

答えて

1

を起動するたびに、実際に予想され、自分自身を解決する必要があります。スレッドは、別のスレッドがロックを解除して後で再試行するまで待機するためにバックオフします。したがって、2番目のスレッドがロックを解除する前にリトライが発生した場合に、このWARNメッセージが複数回ログに記録されることがあります。

しかし、最終的にロックは2番目のスレッドによって解放され、1番目のスレッドはロックを取得できます。その後、ストリームはただ前進する必要があります。注意、それはWARNメッセージであり、エラーではありません。

+0

ただし、この警告を解決するには時間がかかります。このための再試行回数を減らす設定がありますか? –

+0

そして理想的に私はcleanup()を呼び出さなければならない? –

+0

いいえ、あなたはしないでください!それは不必要なオーバーヘッドの多くを導入するだろう!ドキュメントの[ヒント]ボックスを比較してください:https://docs.confluent.io/current/streams/developer-guide.html#step-2-reset-the-local-environments-of-your-application-instances –

関連する問題