2016-07-31 13 views
0

私はカフカを初めて使っています。私は動物園の2人のインスタンスとブローカーの2人のインスタンスでカフカをテストしています。テストトピック "topicA"を作成しました。以下は私の話題です。カフカ消費者がデータを消費しない

Topic:topicA PartitionCount:1  ReplicationFactor:1  Configs: 
     Topic: topicA Partition: 0 Leader: 2  Replicas: 2  Isr: 2 

トピックには、カフカブローカ-2に1つのpartitoinがあり、同じブローカに1つのレプリカしかありません。 私はKafkaプロデューサ(org.apache.kafka.kafka-clients.0.9.0.1)を使用してブローカにメッセージを送信しています。

プロデューサー設定:

props.put("bootstrap.servers", "***:12900"); // this is kafka broker url 
props.put("block.on.buffer.full", "true"); 
props.put("request.required.acks", "1"); 
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
props.put("partition.assignment.strategy", "range"); 

私はプロデューサーから10Kのメッセージを送ります。

kafkaProducer.send(new ProducerRecord<String, String>(
        topic,"partitionName", 
        String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i))); 

System.out.println("Sent Message - " + i + " Successfully"); 

私の消費者には何のメッセージも届きません。

while (true) { 
       ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); 
       for (ConsumerRecord<String, String> record : records) { 
        System.out.println(record.offset() + "---->" + record.value()); 
       } 
      } 

私consmerの小道具:ブローカー1で

bootstrap.servers = *:12900 // this is my kafka broker 
group.id = test 
key.deserializer = org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer = org.apache.kafka.common.serialization.StringDeserializer 
enable.auto.commit=true 
# fast session timeout makes it more fun to play with failover 
session.timeout.ms=10000 

# These buffer sizes seem to be needed to avoid consumer switching to 
# a mode where it processes one bufferful every 5 seconds with multiple 
# timeouts along the way. No idea why this happens. 
fetch.min.bytes=50000 
receive.buffer.bytes=262144 
max.partition.fetch.bytes=2097152 

エラー:BufferUnderFlowExceptionは何度も繰り返されます。ブローカー2で

[Controller-1-to-broker-1-send-thread], Controller 1 epoch 6 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:1;ControllerEpoch:6;CorrelationId:10;ClientId:id_1-host_null-port_12900;Leaders:id:1,host:*,port:12900,id:2,host:*,port:12900;PartitionState:(__consumer_offsets,32) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,16) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,49) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,44) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,28) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,17) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,23) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,7) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,4) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,29) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,35) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,3) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,24) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,41) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,0) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,38) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,13) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,8) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,5) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,39) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,36) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,40) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,45) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,15) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,33) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,37) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,21) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,6) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,11) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,20) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,47) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,2) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,27) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,34) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,9) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,22) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,42) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,14) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,25) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,10) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,48) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,31) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,18) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,19) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,12) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,46) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,43) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,1) -> (LeaderAndIsrInfo:(Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:2,1),(__consumer_offsets,26) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2),(__consumer_offsets,30) -> (LeaderAndIsrInfo:(Leader:1,ISR:1,2,LeaderEpoch:0,ControllerEpoch:6),ReplicationFactor:2),AllReplicas:1,2) to broker id:1,host:*,port:12900. Reconnecting to broker. 
java.io.IOException: Broken pipe 
kafka-request-handler-0]: [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [__consumer_offsets,32],[__consumer_offsets,16],[__consumer_offsets,44],[__consumer_offsets,28],[__consumer_offsets,4],[__consumer_offsets,24],[__consumer_offsets,0],[__consumer_offsets,38],[__consumer_offsets,8],[__consumer_offsets,36],[__consumer_offsets,40],[__consumer_offsets,6],[__consumer_offsets,20],[__consumer_offsets,2],[__consumer_offsets,34],[__consumer_offsets,22],[__consumer_offsets,42],[__consumer_offsets,14],[__consumer_offsets,10],[__consumer_offsets,48],[__consumer_offsets,18],[__consumer_offsets,12],[__consumer_offsets,46],[__consumer_offsets,26],[__consumer_offsets,30] 
2016-07-31 06:48:11,045 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-0 with log end offset 0 
2016-07-31 06:48:11,054 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,0] in log/kafka_1 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}. 
2016-07-31 06:48:11,058 [WARN ][kafka-request-handler-0]: Partition [__consumer_offsets,0] on broker 1: No checkpointed highwatermark is found for partition [__consumer_offsets,0] 
2016-07-31 06:48:11,069 [INFO ][kafka-scheduler-4]: Loading offsets from [__consumer_offsets,0] 
2016-07-31 06:48:11,072 [INFO ][kafka-scheduler-4]: Finished loading offsets from [__consumer_offsets,0] in 3 milliseconds. 
2016-07-31 06:59:31,945 [ERROR][kafka-network-thread-12900-2]: Closing socket for /host because of error 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException 
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66) 
    at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85) 
    at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29) 
    at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50) 
    at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50) 
    at kafka.network.RequestChannel$Request.(RequestChannel.scala:50) 
    at kafka.network.Processor.read(SocketServer.scala:450) 
    at kafka.network.Processor.run(SocketServer.scala:340) 

ログイン(ブローカーに誤りがない)

2016-07-31 06:48:10,972 [INFO ][kafka-request-handler-0]: [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [__consumer_offsets,49],[__consumer_offsets,17],[__consumer_offsets,23],[__consumer_offsets,7],[__consumer_offsets,29],[__consumer_offsets,35],[__consumer_offsets,3],[__consumer_offsets,41],[__consumer_offsets,13],[__consumer_offsets,5],[__consumer_offsets,39],[__consumer_offsets,45],[__consumer_offsets,15],[__consumer_offsets,33],[__consumer_offsets,37],[__consumer_offsets,21],[__consumer_offsets,11],[__consumer_offsets,47],[__consumer_offsets,27],[__consumer_offsets,9],[__consumer_offsets,25],[__consumer_offsets,31],[__consumer_offsets,19],[__consumer_offsets,43],[__consumer_offsets,1] 
2016-07-31 06:48:10,990 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-29 with log end offset 0 
2016-07-31 06:48:10,994 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,29] in log/kafka_2 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}. 
2016-07-31 06:48:10,996 [WARN ][kafka-request-handler-0]: Partition [__consumer_offsets,29] on broker 2: No checkpointed highwatermark is found for partition [__consumer_offsets,29] 
2016-07-31 06:48:10,998 [INFO ][kafka-scheduler-5]: Loading offsets from [__consumer_offsets,29] 
2016-07-31 06:48:11,011 [INFO ][kafka-scheduler-5]: Finished loading offsets from [__consumer_offsets,29] in 13 milliseconds. 
2016-07-31 06:48:11,023 [INFO ][kafka-request-handler-0]: Completed load of log __consumer_offsets-45 with log end offset 0 
2016-07-31 06:48:11,025 [INFO ][kafka-request-handler-0]: Created log for partition [__consumer_offsets,45] in log/kafka_2 with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 104857600, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> compact, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 2628000000, segment.jitter.ms -> 0}. 
2016-07-31 06:48:11,913 [INFO ][kafka-request-handler-0]: [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([ 

1)私の消費者がメッセージを取得することができない理由を私に教えてください? 2)私のプロデューサーとコンシューマーの設定は大丈夫ですか? 私の消費者/プロデューサーはブローカーに直接接続するのではなく、動物園のキーパーに接続する必要がありますか? 3)コントローラでエポックとは何を意味しますか? 4)次の警告には何が含まれていますか。 パーティションのチェックポイント付きハイウォーターマークが見つかりません

+0

Btw:Zookeeperは、うまくいくには奇数のサーバーが必要です。 –

答えて

1

あなたは明らかにクライアントバージョンより古いカフカブローカーバージョンを使用しています。 Kafkaブローカーとコンシューマーバージョンをもう一度確認してください。

エラーは、コンシューマのJoinGroupRequestを処理できないというエラーです。おそらく、これは、あなたのコンシューマーが、ブローカーが理解できないJoinGroupRequestのバージョンを送信していることを意味します。

通常、カフカブローカーのバージョンは、このようなエラーを回避するために使用するクライアントのバージョン以上である必要があります。

解決策は、Kafkaブローカーをアップグレードするか、使用するクライアントをダウングレードすることです。

あなたのconfigsは、この1を除いて、ほとんどがよく見る:

props.put("partition.assignment.strategy", "range"); 

この1つは古い消費者に関連して、安全に削除することができますので、ほとんどの場合、あなたのプロデューサーによって無視されたので、それは無用です。

エポックは、クラスタ状態のバージョンまたは世代IDに類似しています。これにより、通常のブローカとコントローラとの適切な状態同期が可能になります。

関連する問題