2017-02-19 18 views
2

ソーストピックパーティション数が1の場合は正常に動作します。パーティションを1より大きい値にバンプアップすると、以下のエラーが表示されます。低レベルとDSL APIの両方に適用できます。すべてのポインタ?何が失われる可能性がありますか?複数のトピックパーティションを持つKafkaストリームのリバランスエラーに失敗しました

org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance 
     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410) 
     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_1] Store in-memory-avg-store's change log (cpu-streamz-in-memory-avg-store-changelog) does not contain partition 1 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185) 
     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) 
     at org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier$MemoryStore.init(InMemoryKeyValueStoreSupplier.java:102) 
     at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56) 
     at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) 
     at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) 
     at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119) 
     at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) 
     at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) 
     at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) 
     at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) 
     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) 

答えて

3

操作上の問題です。 Kafka Streamsでは、入力されたトピックパーティションの数を「ライフタイム」中に変更することはできません。

実行中のKafka Streamsアプリケーションを停止した場合は、入力したトピックパーティションの数を変更して、アプリケーションを再起動してください(上記のエラーが表示されます)。これを運用用のケースで修正するのは難しいですが、ではなく、で入力トピックパーティションの数を変更することを強くお勧めします(後述のコメントを参照)。 POC /デモの場合は修正するのが難しくありません。

この問題を解決するためには、あなたはカフカのアプリケーションリセットツール使用してアプリケーションをリセットする必要があります。アプリケーションのリセットツールを使用して

を、あなたはワイプ欠点を持っていますあなたのアプリケーションの状態全体を取り出す。したがって、アプリケーションを以前と同じ状態にするには、入力トピック全体を最初から再処理する必要があります。これはもちろん、すべての入力データがまだ利用可能で、ブローカーによってトピック保持時間/サイズポリシーを適用するものが削除されていない場合にのみ可能です。

さらに、トピックを入力するパーティションを追加すると、トピックのパーティション化スキーマが変更されることに注意してください(キーごとのデフォルトのハッシュベースのパーティション化)。 Kafka Streamsでは、入力トピックがキーによって正しく分割されていることを前提としているため、リセットツールを使用してすべてのデータを再処理すると、「古い」データが「新しい」データとは別にパーティション化されるため誤った結果になることがあります。新しいパーティション)。実稼働のユースケースでは、元のトピックのすべてのデータを読み込み、データを正しく分割するために新しいトピック(パーティション数を増やしたもの)に書き込む必要があります(または、このステップでは、キー - 通常は問題ではないはずのもの - 言及したいだけです)。その後、新しいトピックをStreamsアプリケーションの入力トピックとして使用できます。

元のトピックを読んだ後で実際の処理を行う前に、オペレータthrough("new_topic_with_more_partitions")を使用してStreamsアプリケーション内で簡単に再パーティション化を行うこともできます。

しかし、一般的に、後でパーティションの数を変更する必要がないように、実用的な使用例のトピックを過度にパーティション分割することをお勧めします。オーバー・パーティショニングのオーバーヘッドはかなり小さく、後で多くの手間を省きます。 Kafkaを使用する場合は、Streamsの使用例に限定されない一般的な推奨事項です。

つ以上の発言:

一部の人々は、手動でカフカストリーム内部話題のパーティションの数を増やすことをお勧めかもしれません。まず、これはハックであり、特定の理由によりは推奨されません。です。

  1. さまざまな要因(ストリームの内部実装の詳細)に応じて、正しい数値が何であるか把握するのは難しいかもしれません。
  2. また、上記の段落で説明したように、パーティションスキームを破るという問題に直面しています。したがって、あなたのアプリケーションは、おそらく一貫性のない状態に終わるでしょう。

Streamsは、アプリケーションの状態が矛盾しないように、内部トピックの削除や内部トピックのパーティション数の自動変更は行いません。これにより、ユーザーは手動で「クリーンアップ」を実行することにより、すべての意味を認識することができます。

ところで

:今後のカフカ0.10.2の場合、このエラーメッセージが改善されました:https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L100-L103

関連する問題