0

コンフルエント3.0.1プラットフォームを使用し、Kafka-Elasticsearchコネクタを構築しています。このために私はKafkaからデータを取得するためにSinkConnectorとSinkTask(Kafka-connect APIs)を拡張しています。コンフルエントプラットフォームのKafka-Connect APIでmax.poll.recordsを設定する方法

このコードの一部として、一度に100個のレコードしか取得できないように、「max.poll.records」を返すようにSinkConnectorのtaskConfigsメソッドを拡張しています。しかし、それは動作していないと私はすべてのレコードを同時に取得しており、私は規定された時間内にオフセットをコミットしていません。いずれかは、私が「max.poll.records」

public List<Map<String, String>> taskConfigs(int maxTasks) { 
    ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>(); 
    for (int i = 0; i < maxTasks; i++) { 
     Map<String, String> config = new HashMap<String, String>(); 
     config.put(ConfigurationConstants.CLUSTER_NAME, clusterName); 
     config.put(ConfigurationConstants.HOSTS, hosts); 
     config.put(ConfigurationConstants.BULK_SIZE, bulkSize); 
     config.put(ConfigurationConstants.IDS, elasticSearchIds); 
     config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics); 
     config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish); 
     config.put(ConfigurationConstants.TYPES, elasticSearchTypes); 
     config.put("max.poll.records", "100"); 

     configs.add(config); 
    } 
    return configs; 
    } 
+0

あなたのニーズを満たす場合、Confluent 3.1(本日リリース)にはElasticsearchシンクコネクタが含まれています。 http://docs.confluent.io/3.1.0/connect/connect-elasticsearch/docs/index.html – shikhar

答えて

3

あなたは、コネクタの構成でmax.poll.recordsようなほとんどのカフカの消費者のconfigsをオーバーライドすることはできませんを設定することができますしてください。 Connectワーカーの設定では、consumer.という接頭辞を使用することができます。

+0

私はworker.propertiesを作成し、上記のプロパティをプロパティファイルに提供し、以下のコマンドを実行しました。 sh ./bin/connect-standalone ./etc/schema-registry/connect-avrostandalone.properties ./etc/kafka-connect-elasticsearch/worker.properties ./etc/kafka-connect-elasticsearch/connector.properties> connectorlogs.log しかし例外が発生しました。 org.apache.kafka.common.config.ConfigException:デフォルト値を持たない必須設定 "connector.class"がありません。 – Renukaradhya

+0

私のworker.propertiesには "group.id" = operative1が含まれています。 "operative1.max.poll.records" = 1000で、私のconnector.propertiesには適切な "connector.class"が含まれていますが、まだこのエラーが発生しています。 – Renukaradhya

+0

ワーカーの設定で 'consumer.max.poll.records = 1000'が必要です – shikhar

0

これを解決しました。私は以下の設定をconnect-avro-standalone.propertiesに追加しました。

group.id=mygroup 
consumer.max.poll.records=1000 

そして、以下のコマンドを実行してマイナーコネクタを実行しました。

sh ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/connect-elasticsearch-sink.properties 
関連する問題