ここではKafka consumer(バージョン1.3.1)を使用しています。私はacheiveするつもりです何python kafka:各メッセージをグループ単位で1回だけ消費する方法
:
10個のパーティションがあります。各パーティションはオフセット0で始まります。
消費者グループ(1,2,3、など)があります。
時には、1つの消費者がダウンしているか、稼働中です。
したがって、グループのメンバーは変更される可能性があります。しかし、私は、各パーティションの各メッセージをグループで一度だけ消費する必要があります(1 OR 2 OR 3)。
私のコードは次のとおりです。
consumer = KafkaConsumer('my_topic',
bootstrap_servers=['ip:9092'],
auto_offset_reset='earliest',
max_partition_fetch_bytes=131072,
group_id='writer.test')
上記の構成は十分ですか?コメントは歓迎されます。おかげ
UPDATE
は、私は以下のコードを試してみました。パーティション760で毎回、各メッセージは、1つのグループ内の2人の消費者によって2回消費されてもよい。どうして?何か問題でも?
def test():
#PULL FROM KAFKA
consumer = KafkaConsumer(
'topic',
bootstrap_servers=[ip],
auto_offset_reset='latest',
max_partition_fetch_bytes=131072,
auto_commit_interval_ms=500,
group_id='writer2.test')
print consumer.poll()
for i in range(10000):
msg = next(consumer)
if str(msg[1])=='670':
print 'partition= %s, offset= %s' % (msg[1], msg[2])
consumer.unsubscribe()
if __name__ == "__main__":
for i in range(10):
import time
time.sleep(5)
test()
出力1:
{}
partition= 670, offset= 224
partition= 670, offset= 225
partition= 670, offset= 226
partition= 670, offset= 227
partition= 670, offset= 228
partition= 670, offset= 229
partition= 670, offset= 230
partition= 670, offset= 231
partition= 670, offset= 232
partition= 670, offset= 233
partition= 670, offset= 234
partition= 670, offset= 235
partition= 670, offset= 236
partition= 670, offset= 237
partition= 670, offset= 238
partition= 670, offset= 239
partition= 670, offset= 240
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
を実行し、別のウィンドウで同じファイル、出力:
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
単一ブローカーまたは複数ブローカークラスターを使用していますか? –
@MoinuddinQuadri複数のブローカークラスタ。違いは? – BAE
私はあなたの問題は、複数の消費者がいると信じています。あなたが放送しているデータは、すべての消費者によって受信されています。しかし、あなたはそれがただ一つの消費者によって消費されることを望んでいます。私は正しいですか? –