2016-05-09 4 views
0

このコードは時々私に最初からすべてのメッセージを与え、別のメッセージを待っていると、時にはそれだけで話題からのすべてのメッセージは、それらを送信されますされて私は何をしなければならない別のメッセージどのようにkafkaトピックからすべてのメッセージを取得し、それらをJavaを使用して数えますか?

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.message.MessageAndMetadata; 

public class TestConsumer{ 

public static void main(String[] args) { 
    ConsumerConfig config; 
    Properties props = new Properties(); 
    props.put("zookeeper.connect","sandbox.hortonworks.com:2181"); 
    props.put("group.id", "group-4"); 
    props.put("zookeeper.session.timeout.ms", "400"); 
    props.put("zookeeper.sync.time.ms", "200"); 
    props.put("auto.commit.interval.ms", "200"); 
    config = new ConsumerConfig(props); 
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector 
      (config); 
    String topic = "News"; 
    System.out.println("Running"); 
    Run(consumer,topic); 
} 

public static void Run(ConsumerConnector consumer,String topic){ 
    HashMap<String,Integer> topicCountMap = 
      new HashMap<String,Integer>(); 
    topicCountMap.put(topic, 1); 
    Map<String,List<KafkaStream<byte[],byte[]>>> 
    consumerMap = consumer.createMessageStreams(topicCountMap); 
    KafkaStream<byte[],byte[]> stream = consumerMap.get(topic).get(0); 
    ConsumerIterator<byte[],byte[]> it = stream.iterator(); 
    List<String> msgTopicList = new ArrayList<String>(); 
    int count = 0; 
    System.out.println("Waiting"); 
    while(it.hasNext()){ 
     MessageAndMetadata<byte[],byte[]> msgAndData = it.next(); 
     String msg = new String(msgAndData.message()); 
     msgTopicList.add(msg); 
     String key = "NoKey"; 
     System.out.println(msg); 
     count++; 
    } 
} 
} 

を待っていますユーザーとカウントする

これを行うにはどのような方法が最適ですか?

バージョンkafka_2.10-0.8.1.2.2.4.2-2

+0

あなたがいつも話題の非常に最初から読みたいですか? yesの場合、props.put( "auto.offset.reset"、 "smallest")を使用してください。 auto-commitを無効にします:props.put( "enable.auto.commit"、false); –

答えて

0

はここにあなたの一例です。

ここで最も重要なのは、カフカ、消費者の構成プロパティです:

は、キューの先頭から開始します。

props.put("auto.offset.reset", "smallest"); 

このコンシューマのオフセットは保存されません。

props.put("auto.commit.enable", "false"); 

メッセージがもう使用できなくなった場合、メッセージを5秒間待機します。

props.put("consumer.timeout.ms", "5000"); 

全例:

package com.xxx.yyy.zzz; 

import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 

import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerTimeoutException; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

public class KafkaConsumer { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    private int count = 0; 

    public KafkaConsumer(final String zookeeper, final String groupId, final String topic) { 
     this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId)); 
     this.topic = topic; 
    } 

    // Initialize connection properties to Kafka and Zookeeper 
    private static ConsumerConfig createConsumerConfig(final String zookeeper, final String groupId) { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", zookeeper); 
     props.put("group.id", groupId); 
     props.put("zookeeper.session.timeout.ms", "2000"); 
     props.put("zookeeper.sync.time.ms", "250"); 
     props.put("auto.commit.interval.ms", "1000"); 
     props.put("auto.offset.reset", "smallest"); 
     props.put("auto.commit.enable", "false"); 
     props.put("consumer.timeout.ms", "5000"); 

     return new ConsumerConfig(props); 
    } 

    private void getData() { 
     List<byte[]> msgs = new ArrayList(); 
     Map<String, Integer> topicMap = new HashMap<>(); 

     // Define single thread for topic 
     topicMap.put(topic, 1); 
     try { 
      Map<String, List<KafkaStream<byte[], byte[]>>> listMap = consumer.createMessageStreams(topicMap); 
      List<KafkaStream<byte[], byte[]>> kafkaStreams = listMap.get(topic); 

      // Collect the messages. 
      kafkaStreams.forEach(ks -> ks.forEach(mam -> msgs.add(mam.message()))); 

     } catch (ConsumerTimeoutException exception) { 
      // There no more messages available -> so, we are done. 

      // Now print all your messages 
      msgs.forEach(System.out::println); 

      // count them 
      count = msgs.size(); 
     } finally { 
      if (consumer != null) { 
       consumer.shutdown(); 
      } 
     } 
    } 
} 
関連する問題