2016-08-29 8 views
0

ここではKafka consumer(バージョン1.3.1)を使用しています。私はacheiveするつもりです何python kafka:各メッセージをグループ単位で1回だけ消費する方法

  • 10個のパーティションがあります。各パーティションはオフセット0で始まります。

  • 消費者グループ(1,2,3、など)があります。

  • 時には、1つの消費者がダウンしているか、稼働中です。

  • したがって、グループのメンバーは変更される可能性があります。しかし、私は、各パーティションの各メッセージをグループで一度だけ消費する必要があります(1 OR 2 OR 3)。

私のコードは次のとおりです。

consumer = KafkaConsumer('my_topic', 
      bootstrap_servers=['ip:9092'], 
      auto_offset_reset='earliest', 
      max_partition_fetch_bytes=131072, 
      group_id='writer.test') 

上記の構成は十分ですか?コメントは歓迎されます。おかげ

UPDATE

は、私は以下のコードを試してみました。パーティション760で毎回、各メッセージは、1つのグループ内の2人の消費者によって2回消費されてもよい。どうして?何か問題でも?

def test(): 
    #PULL FROM KAFKA 
    consumer = KafkaConsumer(
      'topic', 
      bootstrap_servers=[ip], 
      auto_offset_reset='latest', 
      max_partition_fetch_bytes=131072, 
      auto_commit_interval_ms=500, 
      group_id='writer2.test') 

    print consumer.poll() 
    for i in range(10000): 
     msg = next(consumer) 
     if str(msg[1])=='670': 
      print 'partition= %s, offset= %s' % (msg[1], msg[2]) 
    consumer.unsubscribe() 


if __name__ == "__main__": 
    for i in range(10): 
     import time 
     time.sleep(5) 
     test() 

出力1:

{} 
partition= 670, offset= 224 
partition= 670, offset= 225 
partition= 670, offset= 226 
partition= 670, offset= 227 
partition= 670, offset= 228 
partition= 670, offset= 229 
partition= 670, offset= 230 
partition= 670, offset= 231 
partition= 670, offset= 232 
partition= 670, offset= 233 
partition= 670, offset= 234 
partition= 670, offset= 235 
partition= 670, offset= 236 
partition= 670, offset= 237 
partition= 670, offset= 238 
partition= 670, offset= 239 
partition= 670, offset= 240 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 

を実行し、別のウィンドウで同じファイル、出力:

{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
+0

単一ブローカーまたは複数ブローカークラスターを使用していますか? –

+0

@MoinuddinQuadri複数のブローカークラスタ。違いは? – BAE

+0

私はあなたの問題は、複数の消費者がいると信じています。あなたが放送しているデータは、すべての消費者によって受信されています。しかし、あなたはそれがただ一つの消費者によって消費されることを望んでいます。私は正しいですか? –

答えて

0

あなたが消費者のグループを使用する場合は、カフカがで-少なくともワンス配信保証提供したがって、消費者がそれらの消費者のパーティションを再割り当てできない場合、いくつかのメッセージが2回目に配信される可能性がある。

メッセージが2回処理されないようにするには、パターンを一番早い配送保証に切り替えることができます。しかし、このために、障害が発生した場合にメッセージを失う可能性があります。

pollで受信したメッセージの処理を開始する前に、自動コミットを無効にして、pollの後に手動指示をコミットする必要があります。

詳細については、http://docs.confluent.io/3.0.0/clients/consumer.html#detailed-examplesを参照してください(例がPythonでない場合でも、一般的なパターンは同じです)。

関連する問題