私はコンシューマワーカーアプリケーションを持っています。これは内部でX
個のスレッドを起動しています。各スレッドはそれをKafkaCosnumerにしています。 Cosnumersは同じgroupId
を持ち、同じトピックを購読しています。そのため、各消費者はパーティションの公平な分配を得ることができます。カフカリバランス。重複した処理の問題
処理の性質上、メッセージを失うことはできず、重複を許可することもできません。私が走っているカフカのバージョンは0.10.2.1です。
私が直面している問題は次のとおりです。消費者スレッド1がメッセージを消費し始め、poll()
はメッセージのバッチを取得します。 ConsumerRebalanceListener
も実装しているため、メッセージが正常に処理されるたびにoffsets
マップに追加されます。 (以下のコードを参照してください)。一旦リバランスが行われると、パーティションが他のコンシューマに再割り当てされる前に、私はオフセットをコミットできます。 場合によっては、そのバッチを処理するために、max.poll.interval.ms
よりも時間がかかります。リバランスが発生し、コンシューマ1がコンシューマ1から引き出してコンシューマ2に割り当てられます。コンシューマ1は、そのパーティションが取り消され、その間に消費者2は最後のオフセット(RebalanceListenerによってコミットされた)からピックアップし、同じメッセージを処理します。
他のコンシューマに既に割り当てられているループ内のメッセージの処理を停止できるように、パーティションを破棄したことを消費者に知らせる方法はありますか?
public class RebalanceListener<K, V> implements ConsumerRebalanceListener {
private final KafkaConsumer<K, V> consumer;
private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS =
Maps.newConcurrentMap();
private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);
public RebalanceListener(KafkaConsumer<K, V> consumer) {
this.consumer = consumer;
}
public void addOffset(String topic, int partition, long offset) {
LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}",
topic, partition, offset);
CURRENT_OFFSETS.put(new TopicPartition(topic, partition),
new OffsetAndMetadata(offset, "commit"));
}
public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
return CURRENT_OFFSETS;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
LOGGER.debug("message=following partitions have been revoked from consumer: [{}]",
partitions.stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
LOGGER.debug("message=Comitting offsets for partititions [{}]",
CURRENT_OFFSETS.keySet().stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
consumer.commitSync(CURRENT_OFFSETS);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOGGER.debug("message=following partitions have been assigned to consumer: [{}]",
partitions.stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
}
}
私は、現在の消費者はまだレコード(各ConsumerRecord
がtopic
とpartition
フィールドを持っている)に関連付けられている場合は、すべて単一のメッセージのチェックを処理する前に、次にRebalanceListener
との内部で作成されたconsumerId -- TopicPartition
の同時マップを持っていると思います。 もしそうでなければ、サイクルを壊して次のものを作りますpoll()
。
KafkaConsumerスレッドを複数回転させていても、私のワーカーアプリが1つのインスタンスで実行されている場合、これは実行可能なソリューションになります。しかし、私はそれをスケールアップした後、私はオフセットを隠すことはできませんし、静的マップ内の消費者 - トピックパーティションマッピング。それは、一種の集中型ストレージ、データベース、あるいは、Redisと言いましょう。
しかし、アイテムを処理するたびに、自分のレコードが現在の消費者スレッドによって正当に処理できるかどうかを尋ねなければなりません。スケーリングされたワーカーアプリの場合、それは外部記憶装置へのネットワークコールになります。カフカを使用する目的を無効にすると、処理が遅くなります。私は、単一のアイテムが処理された後にオフセットコミットを実行することを選択するかもしれません。
先週カフカ1.0がリリースされました。 0.11の機能の1つは、[一度配信](https://www.confluent。 – StuartLC
@StuartLC「正確に一度」セマンティクスが0.11で導入されたことは知っていますが、残念ながらまだアップグレードできませんそのバージョンに –
@StuartLCだから、私はあなたが上にリンクした記事を読んだ。私は0.11の消費者が私のケースをどのように扱うことができるのかよく分かりません。 私は消費したメッセージに 'max.poll.interval.ms'以上の処理がかかる可能性があります。 (msgsのバッチ全体が処理された後にコミットします)。すべてのメッセージが処理されるまで、メッセージはプロセスループに入ります。いつの間にかリバランスが起こっています。再調整が行われても、私の古い消費者は処理を続けます。 その間に、他のコンシューマが取り消されたパーティションを引き継ぎます。メッセージは2回処理されます。 –