2016-10-25 4 views
1

私はkafka-0.10.0.0で簡単なデモをやろうとしています。 私のプロデューサーは大丈夫ですが、コンシューマーが正しくない可能性があります。カフカ0.10:消費者に何が問題なのですか?

 Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("group.id", "group1"); 
     props.put("enable.auto.commit", "false"); 
     props.put("session.timeout.ms", "30000"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 


     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
     consumer.subscribe(Arrays.asList("topictest2")); 

     while (true) { 
      ConsumerRecords<String, String> records = consumer.poll(100); 


      for (TopicPartition partition : records.partitions()) { 

       List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); 

       for (ConsumerRecord<String, String> record : partitionRecords) { 

        System.out.println("Thread = "+Thread.currentThread().getName()+" "); 
        System.out.printf("partition = %d, offset = %d, key = %s, value = %s",record.partition(), record.offset(), record.key(), record.value()); 
        System.out.println("\n"); 
       } 
       // consumer.commitSync(); 
       long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 
       consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
      } 

     } 

このデモを実行すると、出力はありません!私のコードの問題は何ですか?

答えて

0

有効と思われます。

は私が auto.offset.resetデフォルトであるため、プログラムはちょうど新しいメッセージを待っていると思う最新

あなたがそのトピックでいくつかのメッセージを持っているし、それらを読みたい場合は、追加しよう

props.put("auto.offset.reset", "earliest"); 

最初からトピックを読み込み、group.idを一意のものにリセットして、保存されたオフセットから継続しないようにするか、オフセットをまったくコミットしないようにしてください。グループIDの場合は、auto.offset.resetをスキップします。

props.put("group.id", "group."+UUID.randomUUID().toString()); 
関連する問題