2016-03-31 12 views
0

私はJavaで簡単なカフカコンシューマを実装しています。ここでは、コードは次のようになります。消費者の上カフカ0.8.2コンシューマ

public class TestConsumer { 

    public static void main(String []a) throws Exception{ 
     Properties props = new Properties(); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("partition.assignment.strategy", "round-robin"); 
     props.put("group.id", "test"); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); 

     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 

     try{ 
      consumer.subscribe("ay_sparktopic"); 
      Map<String, ConsumerRecords<String, String>> msg = consumer.poll(100); 
      System.out.println(msg);     
     }catch(Exception e){ 
      System.out.println("Exception"); 
     } 
    } 
} 

は、次のエラーメッセージを表示します。

16/03/30午後六時01分07秒ConsumerConfigをWARN:group.id =テストが供給されたコンフィギュレーションが、ISN」を既知の設定です。 16/03/30 18:01:07 WARN ConsumerConfig:設定partition.assignment.strategy = round-robinが提供されましたが、既知の設定ではありません。

私がオンラインで確認したドキュメントは、可能な割り当て方法として範囲またはラウンドロビンのいずれかを示し、groupIdは私の知る限りカスタム名です。ここに正しい設定値が何か分かりません。

答えて

2

カフカ0.9+でのみ利用可能な新しいコンシューマAPIを使用しようとしているようです。古いAPIを使用するには、新しいorg.apache.kafka.clients.consumerパッケージの代わりにkafka.javaapi.consumer.*パッケージからクラスをインポートする必要があります。

consumer.subscribeconsumer.pollは新しいAPIに関連しているので、古いAPIを実際に使用したい場合は、それに応じてコードを変更する必要があります。代わりに新しいコンシューマAPIを使用する場合は、Kafka 0.9以降を実行する必要があります。

0

次の依存関係を使用すると、問題が解決します。あなたは、例えば実行中の以前のバージョンを持っていても

libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.9.0.0"

カフカ0.8.2.1

関連する問題