0
このコードは時々私に最初からすべてのメッセージを与え、別のメッセージを待っていると、時にはそれだけで話題からのすべてのメッセージは、それらを送信されますされて私は何をしなければならない別のメッセージどのようにkafkaトピックからすべてのメッセージを取得し、それらをJavaを使用して数えますか?
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class TestConsumer{
public static void main(String[] args) {
ConsumerConfig config;
Properties props = new Properties();
props.put("zookeeper.connect","sandbox.hortonworks.com:2181");
props.put("group.id", "group-4");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "200");
config = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector
(config);
String topic = "News";
System.out.println("Running");
Run(consumer,topic);
}
public static void Run(ConsumerConnector consumer,String topic){
HashMap<String,Integer> topicCountMap =
new HashMap<String,Integer>();
topicCountMap.put(topic, 1);
Map<String,List<KafkaStream<byte[],byte[]>>>
consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[],byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it = stream.iterator();
List<String> msgTopicList = new ArrayList<String>();
int count = 0;
System.out.println("Waiting");
while(it.hasNext()){
MessageAndMetadata<byte[],byte[]> msgAndData = it.next();
String msg = new String(msgAndData.message());
msgTopicList.add(msg);
String key = "NoKey";
System.out.println(msg);
count++;
}
}
}
を待っていますユーザーとカウントする
これを行うにはどのような方法が最適ですか?
バージョンkafka_2.10-0.8.1.2.2.4.2-2
あなたがいつも話題の非常に最初から読みたいですか? yesの場合、props.put( "auto.offset.reset"、 "smallest")を使用してください。 auto-commitを無効にします:props.put( "enable.auto.commit"、false); –