2017-02-03 39 views
1

これは、Apacheカフカのための消費者であり、この結果では、トピック「テスト」Javaアプリケーションからapache kafkaのトピックを購読するには?

package com.kafka; 

import java.util.Arrays; 
import java.util.Properties; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 

public class ConsumerTest { 

    public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "172.17.210.45:9092"); 
    props.put("zookeeper.connect", "172.17.210.45:2181"); 
    props.put("group.id", "test-consumer-group"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("auto.offset.reset", "earliest"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); 
    System.out.println("properties loaded"); 
    kafkaConsumer.subscribe(Arrays.asList("test")); 

    while (true) { 
     ConsumerRecords<String, String> records = kafkaConsumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) { 
      System.out.printf("offset = %d, value = %s", record.offset(), record.value()); 
      System.out.println(); 
     } 
    } 

    } 
} 

からメッセージを取得していないのapacheカフカからのメッセージを取得しておりません。

 log4j:WARN No appenders could be found for logger                            (org.apach e.kafka.clients.consumer.ConsumerConfig). 
    log4j:WARN Please initialize the log4j system properly. 
properties loaded 
+0

あなたが何か間違っていたことが見つかりましたか? – freedev

+0

コンシューマーの実行中にメッセージを生成していますか? – oh54

+0

[Javaからカフカのトピックを作成する方法](http://stackoverflow.com/questions/27036923/how-to-create-a-topic-in-kafka-through-java) – Teja

答えて

0

コードが正しいようです。 ip 172.17.210.45に到達できるかどうかを制御することをお勧めします。

ping 172.17.210.45 

telnet 172.17.210.45 9092 
telnet 172.17.210.45 2181 

サーバー

bin/kafka-topics.sh --list --zookeeper 172.17.210.45:2181 

上の既存のトピックを確認後、あなたは最初にあなたの消費者を移動しようとすることができます(この行はkafkaConsumer.subscribe後に追加する必要があります。

kafkaConsumer.seekToBeginning(Collections.emptyList()); 

最後に、kafkaConsumer.poll(100)の後に、無限ループ内に少しだけSystem.out.println(records.size)行を追加することをお勧めします。レコードを待っているかどうかを確認するだけです。

UPDATE

あなたはプロデューサー部に一つ以上のgroup.idをお持ちの場合は、消費者の一部には、それらのいずれかを使用する必要があります。

+0

@freedev ...もし私がkafkaConsumer.seekToBeginning(Collections.emptyList()); – Teja

+0

ここではトピックのリストを取得していますが、特定のトピックのメッセージを取得していません... – Teja

+0

"test"というトピックを読んでいる場合は、そのトピックが存在すると仮定します。私はまた、プロデューサパートで使用されているのと同じグループIDを必ず使用することをお勧めします。 – freedev

関連する問題