2017-11-02 1 views
0

を読み取ります プロデューサー:はKafkaConsumer私はカフカの例をテストしたいすべてのレコード

object ProducerApp extends App { 

val topic = "topicTest" 
val props = new Properties() 
props.put("bootstrap.servers", "localhost:9092") 
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer") 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
val producer = new KafkaProducer[String, String](props) 
for(i <- 0 to 20) 
{  
val record = new ProducerRecord(topic, "key "+i," value "+i)  
producer.send(record)  
Thread.sleep(100)  
} 
} 

消費者:

object ConsumerApp extends App { 
val topic = "topicTest" 
val properties = new Properties 
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") 
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer") 
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") 
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
val consumer = new KafkaConsumer[String, String](properties) 
consumer.subscribe(scala.List(topic).asJava)  
while (true) { 
val records:ConsumerRecords[String,String] = consumer.poll(200) 
println("records size "+records.count())  
} 
} 

トピック "topicTestは、" 1つのパーティションで作成されます。

records size 21 
records size 21 
records size 21 
records size 21 
... 

が、得られた結果は次のとおり:

期待される結果である

records size 21 
records size 0 
records size 21 
records size 0 
records size 21 
records size 0 
... 

消費者が交互にレコードを読み出します。私はその理由を理解したい。 ありがとうございます

+0

質問は何ですか? 0レコードが表示される理由を説明する必要がありますか? –

+0

はい、私は0レコードを見る理由を知りたいです – DaliMidou

+0

ある時点ではまだデータがありませんので、ポーリングは200ms後に戻りますが、取り出すデータがないので空を返しますリスト。 IMHO –

答えて

0

私の意見では、この現象が表示される理由は、poll()に設定した短いタイムアウトになる可能性があります。 200ミリ秒のタイムアウトは、オフセットリセット後に内部pollOnce()link)を再試行するには不十分な場合があります。消費者は、オフセットリセットを必要とし、レコードを返さない1つのpollOnce()の後にタイムアウトします。 poll()が呼び出されたときのあなたのアプリの次のループでは、オフセットはすでに必要なものです。したがって、レコードを取得するには、おそらく200ミリ秒で十分です。 3番目の呼び出しでは、上記の動作が繰り返されます。 を呼び出すと、すぐにオフセットがリセットされるのではなく、オフセットリセットのためにパーティションにフラグが立てられることに注意してください。実際のリセットは、poll()またはposition()への次の呼び出しで発生します。

poll()タイムアウトを増やしてください。それでは、あなたが得ている出力record size 0を取り除いてください。

関連する問題