を読み取ります プロデューサー:はKafkaConsumer私はカフカの例をテストしたいすべてのレコード
object ProducerApp extends App {
val topic = "topicTest"
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for(i <- 0 to 20)
{
val record = new ProducerRecord(topic, "key "+i," value "+i)
producer.send(record)
Thread.sleep(100)
}
}
消費者:
object ConsumerApp extends App {
val topic = "topicTest"
val properties = new Properties
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(scala.List(topic).asJava)
while (true) {
val records:ConsumerRecords[String,String] = consumer.poll(200)
println("records size "+records.count())
}
}
トピック "topicTestは、" 1つのパーティションで作成されます。
records size 21
records size 21
records size 21
records size 21
...
が、得られた結果は次のとおり:
期待される結果である
records size 21
records size 0
records size 21
records size 0
records size 21
records size 0
...
消費者が交互にレコードを読み出します。私はその理由を理解したい。 ありがとうございます
質問は何ですか? 0レコードが表示される理由を説明する必要がありますか? –
はい、私は0レコードを見る理由を知りたいです – DaliMidou
ある時点ではまだデータがありませんので、ポーリングは200ms後に戻りますが、取り出すデータがないので空を返しますリスト。 IMHO –