2016-03-22 3 views
0

私はJava Kafkaコンシューマを実装しようとしています。私はKafkaサーバーバージョン0.9を使用します。 これはテスト用ですから、私がしなければならないのは、1つのメッセージを読むことだけです。返されたレコードオブジェクト1つのメッセージを読むことができませんでした。JavaベースのKafkaコンシューマ

public static ConsumerRecords<String, String> readFromKafka() { 
ConsumerRecords<String, String> records = null; 
try { 
    Properties kafkaProps = new Properties(); 
    kafkaProps.put("bootstrap.servers", "<KAFKA_SERVER_HOST>:9092"); 
    kafkaProps.put("auto.commit.enable", "false"); 
    kafkaProps.put("value.deserializer", StringDeserializer.class.getName()); 
    kafkaProps.put("key.deserializer", StringDeserializer.class.getName()); 
    kafkaProps.put("client.id", "testScore0"); 
    kafkaProps.put("group.id", "testScore1"); 
    kafkaProps.put("auto.offset.reset", "latest"); 

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps); 
    consumer.subscribe(Arrays.asList("my_topic")); 

    records = consumer.poll(0); 

    } catch (Exception e) { 
    logger.error("Can not read from kafka", e); 
    } 
    return records; 
} 

空です:

enter image description here

私は同じKAFKA_SERVER_HOSTに接続し、メッセージを得るか、私のローカルマシン上でコマンドラインカフカの消費者を実行します。

答えて

1

0よりも大きな何かのため

records = consumer.poll(0); 

上の変更ポーリング時間を、100

records = consumer.poll(100); 
てみてください
関連する問題