2016-12-07 2 views
4

すべてのトピックのパーティションを再開すると、デバッグメッセージが継続的に表示されます。以下のように。このメッセージは、サーバー上のすべてのミリ秒を連続して表示します。Akka.Kafka - 警告メッセージ - パーティションの再開

08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-7 
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-6 
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-9 
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-8 

ここでこの はコード

val zookeeperHost = "localhost" 
val zookeeperPort = "9092" 
// Kafka queue settings 
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) 
     .withBootstrapServers(zookeeperHost + ":" + zookeeperPort) 
     .withGroupId((groupName)) 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") 

// Streaming the Messages from Kafka queue 
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName)) 
    .map(msg => { 
     consumed(msg.record.value) 
    }) 
    .runWith(Sink.ignore) 

あるDEBUGメッセージを停止するために正しくパーティションを行うことを助けてください。

+0

継続してKafkaConsumer.resumeメソッドを呼び出しましたか? – amethystic

+0

私はKafkaConsumer.resumeメソッドを呼び出していません。私は 'Consumer.committableSource'をメインクラスから一回呼びます。 –

+0

Akka-Kafkaのパーティション構成は必要ですか?前もって感謝します!! –

答えて

1

reactive-kafka codeフェッチを開始する前に、すべてのパーティションを再開するようだ:パーティションが以前に一時停止していなかった場合

consumer.assignment().asScala.foreach { tp => 
    if (partitionsToFetch.contains(tp)) consumer.resume(java.util.Collections.singleton(tp)) 
    else consumer.pause(java.util.Collections.singleton(tp)) 
} 
def tryPoll{...} 
checkNoResult(tryPoll(0)) 

KafkaConsumer.resumeメソッドは何も行いません。

+0

私は自分のコードからパーティションを一時停止していません。しかし、再開するパーティションを継続的に取得しています。これを止める方法はありますか?これをどうすれば処理できますか?どんな提案もお願いします。前もって感謝します!! –

+0

@ArunKannanあなたはそれを解決しましたか?私は同じ問題を抱えています – igx

関連する問題