2016-07-08 31 views
2

私はspring-kafka(spring-integration-kafkaではなく)とアプリケーションを統合しています。 http://docs.spring.io/spring-kafka/docs/1.0.1.RELEASE/reference/htmlsinglespring-kafka(統合されていない)コンシューマがメッセージを消費しない

私のプロデューサーは完全に機能しますが、コンシューマーはメッセージを消費していません。任意のポインタ。

@EnableKafka 
public class MyConfig { 

    @Value("${kafka.broker.list}") // List of servers server:port, 
    private String kafkaBrokerList; 

    @Bean 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, Message>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<Integer, Message> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(12); 
     factory.getContainerProperties().setPollTimeout(3000); 
     factory.getContainerProperties().setIdleEventInterval(60000L); 
     factory.setAutoStartup(Boolean.TRUE); 
     factory.setMessageConverter(new StringJsonMessageConverter()); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<Integer, Message> consumerFactory() { 
     JsonDeserializer<Message> messageJsonDeserializer = new JsonDeserializer<>(Message.class); 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new IntegerDeserializer(), messageJsonDeserializer); 
    } 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList); 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000); 
     props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 60000); 
     props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 10000); 
     return props; 
    } 

    @KafkaListener(topics = "myTopic", containerFactory = "kafkaListenerContainerFactory") 
    public void listen(@Payload Message message) { 
     System.out.println(message); 
    } 

} 

が**応答のためのより多くの情報**

おかげゲイリーで編集:

はここに私の構成です。ログに例外はありません。また、同様の設定でKafkaTemplateを試しましたが、キューにメッセージを公開することはできますが、消費者にとっては不運です。 Messageオブジェクトの代わりにStringを使用するようにコードを変更しています。ここでは、ログの一部である:

2016-07-11 09:31:43.314 INFO [RMI TCP Connection(2)-127.0.0.1] o.a.k.c.c.ConsumerConfig [AbstractConfig.java:165] ConsumerConfig values: 
    metric.reporters = [] 
    metadata.max.age.ms = 300000 
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
    group.id = 
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] 
    reconnect.backoff.ms = 50 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    max.partition.fetch.bytes = 1048576 
    bootstrap.servers = [app1.qa:9092, app1.qa:9093, app2.qa:9092, app2.qa:9093, app3.qa:9092, app3.qa:9093] 
    retry.backoff.ms = 10000 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    ssl.keystore.type = JKS 
    ssl.trustmanager.algorithm = PKIX 
    enable.auto.commit = true 
    ssl.key.password = null 
    fetch.max.wait.ms = 500 
    sasl.kerberos.min.time.before.relogin = 60000 
    connections.max.idle.ms = 60000 
    ssl.truststore.password = null 
    session.timeout.ms = 15000 
    metrics.num.samples = 2 
    client.id = 
    ssl.endpoint.identification.algorithm = null 
    key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer 
    ssl.protocol = TLS 
    check.crcs = true 
    request.timeout.ms = 40000 
    ssl.provider = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.keystore.location = null 
    heartbeat.interval.ms = 3000 
    auto.commit.interval.ms = 10000 
    receive.buffer.bytes = 32768 
    ssl.cipher.suites = null 
    ssl.truststore.type = JKS 
    security.protocol = PLAINTEXT 
    ssl.truststore.location = null 
    ssl.keystore.password = null 
    ssl.keymanager.algorithm = SunX509 
    metrics.sample.window.ms = 30000 
    fetch.min.bytes = 1 
    send.buffer.bytes = 131072 
    auto.offset.reset = latest 

また、私はログに次のように表示します。

2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-11] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-8] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
+1

コードがよさそうです。だから、あなたが探しているサンプルが不明です。たとえ追加しても、それは非常に単純で非常に広い設定オプションはありません。それで、あなたは間違ったカフカやトピックに接続するか、何らかのエラーを示すログに何かがあります。 –

+0

応答のためにゲイリーに感謝します。ログに例外はありません。また、同様の設定でKafkaTemplateを試してみましたが、キューにメッセージを公開することはできますが、消費者にとっては不運です。 Messageオブジェクトの代わりにStringを使用するようにコードを変更しています。コメントの長さの制限のため、私はメインポストでこれを追加しました。 – Shailesh

+0

このカテゴリを追加した後、ログの詳細を追跡できます: 'org.apache.kafka.clients = DEBUG' –

答えて

2

ドキュメント上で参照は言う:

シリアライザ/デシリアライザのAPIは非常にシンプルかつ柔軟であるが、低レベルのKafka Consumer and Producerの観点からは、KafkaTemplateと@ KafkaListenerが存在するMessagingレベルでは十分ではありません。 org.springframework.messaging.Messageへの簡単な変換のために、Spring for Apache KafkaはMessagingMessageConverter実装とStringJsonMessageConverterのカスタマイズでMessageConverter抽象化を提供します。

しかし、あなたの場合には、あなたは MessageConverter組み合わせ:

 JsonDeserializer<Message> messageJsonDeserializer = new JsonDeserializer<>(Message.class); 

最も単純なケースではなくStringDeserializerを使用する必要がありますあなたのための修正:カスタムDeserializer

 factory.setMessageConverter(new StringJsonMessageConverter()); 

https://kafka.apache.org/090/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html

上記のログメッセージMarking the coordinator XXX dead.について言えば、このエラーはspring-kafkaプロジェクトに関連していませんが、問題はカフカの設定にあることを意味します。私の場合、カフカのノードが動物園に到達できない場所では、このような問題が発生しました。問題のトラブルシューティングのために、私はローカルの両方カフカ& Zookeperをインストールし、kafka-console-producerkafka-console-consumerを使用して、その上に生産がかかり作品、例えば、そのことを確認することをお勧めいたします:

https://www.cloudera.com/documentation/kafka/latest/topics/kafka_command_line.html

そして、次の段階として、あなたがかもしれません同一のローカルインストールでサンプルspring-kafkaのアプリケーションを確認してください。

+0

同時に、 'JsonDeserializer'は、ペイロードのためにPOJOを使用して、カフカのメッセージからインスタンスを抽出することができます。 – stepio

+0

ありがとうStepio、私はStringJsonMessageConverterを削除してみましたが、まだ私はメッセージが消費されて表示されません。また、複雑さを軽減するために、カスタムJSONデシリアライザの代わりにStringDesilizerでテストするコードを変更しています。 'code' 新しいDefaultKafkaConsumerFactory <>(consumerConfigs()、新しいIntegerDeserializer()、新しいStringDeserializer())を返します。 'code' と消費者をに変更される: ' code' containerProps.setMessageListener((MessageListenerの<整数、文字列>)メッセージ - > { のSystem.out.println(message.value());} ); 'コード – Shailesh

関連する問題