2017-02-13 6 views
2

私はkafkaからのメッセージを消費するJavaプログラムを作成しました。私は消費の遅れを監視したい、どのようにjavaでそれを取得するには?kafkaがJavaプログラムで遅延を消費する方法

はところで、私が使用します。

<groupId>org.apache.kafka</groupId> 
<artifactId>kafka_2.11</artifactId> 
<version>0.10.1.1</version> 

感謝を事前に。

+0

私はそれが書かれたときに、メッセージにタイムスタンプを追加し、それが終了タイミングに終わりを取得するために読み込まれた時にこれを比較します。 –

+0

実際、ラグカウントを取得したいのですが、 – jamee

+0

ラグカウントは何を使用しますか? –

答えて

1

私は個人的に私の消費者から直接jmx情報を問い合わせます。私はJavaでしか消費しないので、JMX bean:kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*/records-lag-maxが利用可能です。

jolokiaがクラスパスにある場合、/jolokia/read/kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*/records-lag-maxのGETで値を取得し、すべての結果を1か所に集めることができます。

また、設定が非常に簡単なBurrowもありますが、これは少し古くなっています(よく覚えていれば0.10で動作しません)。

+0

jmxのJMX beanを確認しました:kafka.server:type = FetcherLagMetrics、name = ConsumerLag、clientId = mygroup、topic = mytpoic、partition = *ラグ情報を含んでいます – jamee

0

AdminClient#listGroupOffsets(groupID)を使用して、コンシューマのグループに関連付けられているすべてのトピックパーティションのオフセットを取得してみます。たとえば、次のように

AdminClient client = AdminClient.createSimplePlaintext("localhost:9092"); 
Map<TopicPartition, Object> offsets = JavaConversions.asJavaMap(
    client.listGroupOffsets("groupID")); 
Long offset = (Long) offsets.get(new TopicPartition("topic", 0)); 
... 

EDIT:ショー上記
スニペット約束が与えられたパーティションのオフセットを取得する方法。次のコードは、パーティションのLEOを取得する方法を示しています。 getLogEndOffset戻り、所与のパーティションのLEOを呼び出す

public long getLogEndOffset(TopicPartition tp) { 
    KafkaConsumer consumer = createNewConsumer(); 
    Collections.singletonList(tp); 
    consumer.assign(Collections.singletonList(tp)); 
    consumer.seekToEnd(Collections.singletonList(tp)); 
    return consumer.position(tp); 
} 

private KafkaConsumer<String, String> createNewConsumer() { 
    Properties properties = new Properties(); 
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "g1"); 
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); 
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
    return new KafkaConsumer(properties); 
} 

は、それからオフセットコミット結果が遅れている引きます。

+0

AdminClientはどこから来たのですか?あなたは私に依存関係を与えることができますか?どうもありがとう。 – jamee

+0

すでにkafka_2.11を使っているkafka.adminパッケージから来ています。 – amethystic

+0

これは、現在のオフセットを示します。コミットされたオフセットはどのように取得できますか? –

0

kafka(およびscala)依存関係をプロジェクトに含める必要がない場合は、以下のクラスを使用できます。 kafka-clients依存関係のみを使用します。

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.consumer.OffsetAndMetadata; 
import org.apache.kafka.common.PartitionInfo; 
import org.apache.kafka.common.TopicPartition; 
import org.apache.kafka.common.serialization.StringDeserializer; 

import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.UUID; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.function.BinaryOperator; 
import java.util.stream.Collectors; 

public class KafkaConsumerMonitor { 

    public static class PartionOffsets { 
     private long endOffset; 
     private long currentOffset; 
     private int partion; 
     private String topic; 

     public PartionOffsets(long endOffset, long currentOffset, int partion, String topic) { 
      this.endOffset = endOffset; 
      this.currentOffset = currentOffset; 
      this.partion = partion; 
      this.topic = topic; 
     } 

     public long getEndOffset() { 
      return endOffset; 
     } 

     public long getCurrentOffset() { 
      return currentOffset; 
     } 

     public int getPartion() { 
      return partion; 
     } 

     public String getTopic() { 
      return topic; 
     } 
    } 

    private final String monitoringConsumerGroupID = "monitoring_consumer_" + UUID.randomUUID().toString(); 

    public Map<TopicPartition, PartionOffsets> getConsumerGroupOffsets(String host, String topic, String groupId) { 
     Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host); 


     KafkaConsumer consumer = createNewConsumer(groupId, host); 

     BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> { 
      throw new IllegalStateException(); 
     }; 

     Map<TopicPartition, PartionOffsets> result = logEndOffset.entrySet() 
       .stream() 
       .collect(Collectors.toMap(
         entry -> (entry.getKey()), 
         entry -> { 
          OffsetAndMetadata committed = consumer.committed(entry.getKey()); 
          return new PartionOffsets(entry.getValue(), committed.offset(), entry.getKey().partition(), topic); 
         }, mergeFunction)); 


     return result; 
    } 

    public Map<TopicPartition, Long> getLogEndOffset(String topic, String host) { 
     Map<TopicPartition, Long> endOffsets = new ConcurrentHashMap<>(); 
     KafkaConsumer<?, ?> consumer = createNewConsumer(monitoringConsumerGroupID, host); 
     List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic); 
     List<TopicPartition> topicPartitions = partitionInfoList.stream().map(pi -> new TopicPartition(topic, pi.partition())).collect(Collectors.toList()); 
     consumer.assign(topicPartitions); 
     consumer.seekToEnd(topicPartitions); 
     topicPartitions.forEach(topicPartition -> endOffsets.put(topicPartition, consumer.position(topicPartition))); 
     consumer.close(); 
     return endOffsets; 
    } 

    private static KafkaConsumer<?, ?> createNewConsumer(String groupId, String host) { 
     Properties properties = new Properties(); 
     properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host); 
     properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
     properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
     properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     return new KafkaConsumer<>(properties); 
    } 
} 
関連する問題