2016-05-02 4 views
0

私のテストプログラムでは、私はリスナーを開始します。その後、ループでメッセージを送信します。 1つのメッセージを送信しても、そのメッセージをリスニングしていません。 2つのメッセージを送信すると、1つのメッセージを受信します.3を送信すると、2つのメッセージを受信します。なぜですか?なぜカフカ消費者が最初のメッセージを聞いていないのですか?

プロデューサー

KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes); 

if (log.isDebugEnabled()) { 
    log.debug("producing messages to topic : " + topic + "file : " + payload.get("name")); 
} 

for (int i = 0; i < 3; i++) { 
    producer.send(message); 
    System.out.println("producing .."); 
} 

消費者

public void run() { 

    try { 
     ConsumerIterator<byte[], byte[]> itr = m_stream.iterator(); 
     log.info("Kafka listener is ready to listen.."); 
     System.out.println("listens...."); 

     while (itr.hasNext()) { 
      byte[] data = itr.next().message(); 
      System.out.println("Message received : " + data); 
     } 
    } 
} 

消費者の特性

enable.auto.commit=true 

auto.commit.interval.ms=101 

session.timeout.ms=7000 

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 

zookeeper.connect=zk1.xx\:2181 

heartbeat.interval.ms=1000 

auto.offset.reset=smallest 

serializer.class=kafka.serializer.DefaultEncoder 

bootstrap.servers=kk1.xx\:9092 

group.id=test 

consumer.timeout.ms=-1 

fetch.min.bytes=1 

receive.buffer.bytes=262144 
+0

カフカのトピックでいくつのパーティションを設定していますか? – jbarrueta

+0

@jbarruetaデフォルトのtopics..0パーティションです – Ratha

答えて

1

私はsのことで、これを固定私のプロデューサーのプロパティに従う。

request.required.acks=1 
関連する問題