2017-11-03 1 views
1

私はコンシューマワーカーアプリケーションを持っています。これは内部でX個のスレッドを起動しています。各スレッドはそれをKafkaCosnumerにしています。 Cosnumersは同じgroupIdを持ち、同じトピックを購読しています。そのため、各消費者はパーティションの公平な分配を得ることができます。カフカリバランス。重複した処理の問題

処理の性質上、メッセージを失うことはできず、重複を許可することもできません。私が走っているカフカのバージョンは0.10.2.1です。

私が直面している問題は次のとおりです。消費者スレッド1がメッセージを消費し始め、poll()はメッセージのバッチを取得します。 ConsumerRebalanceListenerも実装しているため、メッセージが正常に処理されるたびにoffsetsマップに追加されます。 (以下のコードを参照してください)。一旦リバランスが行われると、パーティションが他のコンシューマに再割り当てされる前に、私はオフセットをコミットできます。 場合によっては、そのバッチを処理するために、max.poll.interval.msよりも時間がかかります。リバランスが発生し、コンシューマ1がコンシューマ1から引き出してコンシューマ2に割り当てられます。コンシューマ1は、そのパーティションが取り消され、その間に消費者2は最後のオフセット(RebalanceListenerによってコミットされた)からピックアップし、同じメッセージを処理します。

他のコンシューマに既に割り当てられているループ内のメッセージの処理を停止できるように、パーティションを破棄したことを消費者に知らせる方法はありますか?

public class RebalanceListener<K, V> implements ConsumerRebalanceListener { 

    private final KafkaConsumer<K, V> consumer; 

    private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS = 
      Maps.newConcurrentMap(); 

    private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class); 

    public RebalanceListener(KafkaConsumer<K, V> consumer) { 
     this.consumer = consumer; 
    } 

    public void addOffset(String topic, int partition, long offset) { 
     LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}", 
       topic, partition, offset); 
     CURRENT_OFFSETS.put(new TopicPartition(topic, partition), 
       new OffsetAndMetadata(offset, "commit")); 
    } 

    public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() { 
     return CURRENT_OFFSETS; 
    } 

    @Override 
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 
     LOGGER.debug("message=following partitions have been revoked from consumer: [{}]", 
       partitions.stream().map(
         topicPartition -> topicPartition.topic() + ":" + topicPartition.partition()) 
         .collect(joining(","))); 
     LOGGER.debug("message=Comitting offsets for partititions [{}]", 
       CURRENT_OFFSETS.keySet().stream().map(
         topicPartition -> topicPartition.topic() + ":" + topicPartition.partition()) 
         .collect(joining(","))); 
     consumer.commitSync(CURRENT_OFFSETS); 
    } 

    @Override 
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 
     LOGGER.debug("message=following partitions have been assigned to consumer: [{}]", 
       partitions.stream().map(
         topicPartition -> topicPartition.topic() + ":" + topicPartition.partition()) 
         .collect(joining(","))); 
    } 

} 

私は、現在の消費者はまだレコード(各ConsumerRecordtopicpartitionフィールドを持っている)に関連付けられている場合は、すべて単一のメッセージのチェックを処理する前に、次にRebalanceListenerとの内部で作成されたconsumerId -- TopicPartitionの同時マップを持っていると思います。 もしそうでなければ、サイクルを壊して次のものを作りますpoll()

KafkaConsumerスレッドを複数回転させていても、私のワーカーアプリが1つのインスタンスで実行されている場合、これは実行可能なソリューションになります。しかし、私はそれをスケールアップした後、私はオフセットを隠すことはできませんし、静的マップ内の消費者 - トピックパーティションマッピング。それは、一種の集中型ストレージ、データベース、あるいは、Redisと言いましょう。

しかし、アイテムを処理するたびに、自分のレコードが現在の消費者スレッドによって正当に処理できるかどうかを尋ねなければなりません。スケーリングされたワーカーアプリの場合、それは外部記憶装置へのネットワークコールになります。カフカを使用する目的を無効にすると、処理が遅くなります。私は、単一のアイテムが処理された後にオフセットコミットを実行することを選択するかもしれません。

+0

先週カフカ1.0がリリースされました。 0.11の機能の1つは、[一度配信](https://www.confluent。 – StuartLC

+0

@StuartLC「正確に一度」セマンティクスが0.11で導入されたことは知っていますが、残念ながらまだアップグレードできませんそのバージョンに –

+0

@StuartLCだから、私はあなたが上にリンクした記事を読んだ。私は0.11の消費者が私のケースをどのように扱うことができるのかよく分かりません。 私は消費したメッセージに 'max.poll.interval.ms'以上の処理がかかる可能性があります。 (msgsのバッチ全体が処理された後にコミットします)。すべてのメッセージが処理されるまで、メッセージはプロセスループに入ります。いつの間にかリバランスが起こっています。再調整が行われても、私の古い消費者は処理を続けます。 その間に、他のコンシューマが取り消されたパーティションを引き継ぎます。メッセージは2回処理されます。 –

答えて

0
あなたはonPartitionsRevokedを(実装する必要が

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked(java.util.Collection)

すべての消費者プロセスは前 onPartitionsAssignedを呼び出す任意のプロセスにonPartitionsRevoked を呼び出すことが保証されます。したがって、オフセットやその他の状態が onPartitionsRevokedコールに保存されている場合、そのパーティションを引き継ぐプロセスの はonPartitionsAssigned コールバックによって状態をロードするまでに保存されることが保証されます。

+0

あなたは私の質問を慎重に読んでいないと思います。私が提供したスニペットでは、 'onPartitionsRevoked()'が実装されています。 問題は、処理を引き継ぐ消費者ではなく、死んだと考えられているが、 'poll() 'の最後の呼び出しで消費されたメッセージを処理しているものである –

+0

はい、しかし外部にはオフセットを格納していませんマニュアルで説明されているonPartitionsRevoked()実装。すでに取り消されたパーティションのために、カフカに戻ってオフセットをコミットできるとは思いません。 Kafkaへのオフセットをコミットしたい場合は、処理を終了し、コンシューマがパーティションを取り消す前に、その処理を行う必要があります。 –

+0

同じJVM内で実行されている複数のコンシューマ・スレッドがあります。私のリバランスのリスナーには 'addOffset()'メソッドがあり、静的な同時マップにオフセットを追加しています。それは私の消費者スレッドのすべてが同じオフセットマップを扱うことを意味します。 上記で引用したスニペットに戻ると、すべてのコンシューマプロセスが 'onPartitionsRevoked()'を呼び出すと言います。私のオフセットのすべては、そうしなければならないように内部的に保持されることを意味します。 –