2017-12-20 8 views
0

私はKafka(バージョン0.11.0.2)サーバーAPIを使用してローカルホストでkafkaブローカーを開始しています。問題なく実行されるので、プロデューサーもメッセージの成功を送ることができます。 しかし、コンシューマはメッセージを受け取ることができず、consoleには何もエラーログはありません。コードをデバッグし、"refreshing metadata"のためにループします。私はその[OK]を実行し、0.10.xにカフカのバージョンを変更します。ここではlog.debug(グループ{}のコーディネーターディスカバリーに失敗しました。メタデータをリフレッシュしました。 "、groupId)kafka 0.11.0.xで

は、ソースコード

while (coordinatorUnknown()) { 
     RequestFuture<Void> future = lookupCoordinator(); 
     client.poll(future, remainingMs); 

     if (future.failed()) { 
      if (future.isRetriable()) { 
       remainingMs = timeoutMs - (time.milliseconds() - startTimeMs); 
       if (remainingMs <= 0) 
        break; 

       log.debug("Coordinator discovery failed for group {}, refreshing metadata", groupId); 
       client.awaitMetadataUpdate(remainingMs); 
      } else 
       throw future.exception(); 
     } else if (coordinator != null && client.connectionFailed(coordinator)) { 
      // we found the coordinator, but the connection has failed, so mark 
      // it dead and backoff before retrying discovery 
      coordinatorDead(); 
      time.sleep(retryBackoffMs); 
     } 

     remainingMs = timeoutMs - (time.milliseconds() - startTimeMs); 
     if (remainingMs <= 0) 
      break; 
    } 

Adddtionです。

私のカフカサーバーコードです。

private static void startKafkaLocal() throws Exception { 
    final File kafkaTmpLogsDir = File.createTempFile("zk_kafka", "2"); 
    if (kafkaTmpLogsDir.delete() && kafkaTmpLogsDir.mkdir()) { 
     Properties props = new Properties(); 
     props.setProperty("host.name", KafkaProperties.HOSTNAME); 
     props.setProperty("port", String.valueOf(KafkaProperties.KAFKA_SERVER_PORT)); 
     props.setProperty("broker.id", String.valueOf(KafkaProperties.BROKER_ID)); 
     props.setProperty("zookeeper.connect", KafkaProperties.ZOOKEEPER_CONNECT); 
     props.setProperty("log.dirs", kafkaTmpLogsDir.getAbsolutePath()); 
     //advertised.listeners=PLAINTEXT://xxx.xx.xx.xx:por 
    // flush every message. 

     // flush every 1ms 
     props.setProperty("log.default.flush.scheduler.interval.ms", "1"); 
     props.setProperty("log.flush.interval", "1"); 
     props.setProperty("log.flush.interval.messages", "1"); 
     props.setProperty("replica.socket.timeout.ms", "1500"); 
     props.setProperty("auto.create.topics.enable", "true"); 
     props.setProperty("num.partitions", "1"); 

     KafkaConfig kafkaConfig = new KafkaConfig(props); 

     KafkaServerStartable kafka = new KafkaServerStartable(kafkaConfig); 
     kafka.startup(); 
     System.out.println("start kafka ok "+kafka.serverConfig().numPartitions()); 
    } 
} 

ありがとうございました。カフカ0.11で

答えて

1

、あなたは1にnum.partitionsを設定する場合は、以下の3つの設定を設定する必要があります。

offsets.topic.replication.factor=1 
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 

0.11を実行しているときに、サーバーのログから明らかです。

+0

ありがとうございました。どのようにサーバーのログを取得するローカルKafkaサーバーを設定するには? –

+0

'System.setProperty(" log4j.configuration "、新しいファイル(.....))を使ってKafkaのデフォルトのlog4jファイル' config/log4j.properties'を渡してください; –

+0

ありがとう –

関連する問題