2016-07-30 2 views
1

私はkafkaを新しくしました。私の要件は、私はデータベースのソースと宛先に2つのテーブルがあります。今私はソーステーブルからデータをフェッチして、これらのカフカの間の宛先に格納して、プロデューサとコンシューマとして動作させたいと考えています。私はコードを実行しましたが、問題は、プロデューサがデータを生成するときに、生成するデータが欠けているということです。たとえば、ソース表に100レコードがある場合、100レコードすべてが生成されるわけではありません。Apache Kafka:すべてのデータを生産していないプロデューサー

public void run() { 
    SourceDAO sourceDAO = new SourceDAO(); 
    Source source; 
    int id; 
    try { 
     logger.debug("INSIDE RUN"); 
     List<Source> listOfEmployee = sourceDAO.getAllSource(); 
     Iterator<Source> sourceIterator = listOfEmployee.iterator(); 
     String sourceJson; 
     Gson gson = new Gson(); 
     while(sourceIterator.hasNext()) { 
      source = sourceIterator.next(); 
      sourceJson = gson.toJson(source); 
      id = source.getId(); 
      producerRecord = new ProducerRecord<Integer, String>(TOPIC, id, sourceJson); 
      producerRecords.add(producerRecord); 
     } 

     for(ProducerRecord<Integer, String> record : producerRecords) { 
      logger.debug("Producer Record: " + record.value()); 
      producer.send(record, new Callback() { 
       @Override 
       public void onCompletion(RecordMetadata metadata, Exception exception) { 
        logger.debug("Exception: " + exception); 
        if (exception != null) 
         throw new RuntimeException(exception.getMessage()); 
        logger.info("The offset of the record we just sent is: " + metadata.offset() 
          + " In Partition : " + metadata.partition()); 
       } 
      }); 
     } 
     producer.close(); 
     producer.flush(); 
     logger.info("Size of Record: " + producerRecords.size()); 
    } catch (SourceServiceException e) { 
     logger.error("Unable to Produce data...", e); 
     throw new RuntimeException("Unable to Produce data...", e); 
    } 
} 

マイ消費者設定: - - :

bootstrap.servers=192.168.1.XXX:9092,192.168.1.231:XXX,192.168.1.232:XXX 
group.id=consume 
client.id=C1 
enable.auto.commit=true 
auto.commit.interval.ms=1000 
max.partition.fetch.bytes=10485760 
session.timeout.ms=35000 
consumer.timeout.ms=35000 
auto.offset.reset=earliest 
message.max.bytes=10000000 
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer 

value.deserializer = orgのように私は私のプロデューサーコードカフカ-0.10

MyProducer CONFIG-

bootstrap.servers=192.168.1.XXX:9092,192.168.1.XXX:9093,192.168.1.XXX:9094 
acks=all 
retries=2 
batch.size=16384 
linger.ms=2 
buffer.memory=33554432 
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer 
value.serializer=org.apache.kafka.common.serialization.StringSerializer 

を使用しています。 apache.kafka.common.serialization.StringDeserializer

コンシューマーコード: -

public void doWork() { 
    logger.debug("Inside doWork of DestinationConsumer"); 
    DestinationDAO destinationDAO = new DestinationDAO(); 
    consumer.subscribe(Collections.singletonList(this.TOPIC)); 
    while(true) { 
     ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); 
     int minBatchSize = 1; 
     for(ConsumerRecord<String, String> rec : consumerRecords) { 
      logger.debug("Consumer Recieved Record: " + rec); 
      consumerRecordsList.add(rec); 
     } 
     logger.debug("Record Size: " + consumerRecordsList.size()); 
     if(consumerRecordsList.size() >= minBatchSize) { 
      try { 
       destinationDAO.insertSourceDataIntoDestination(consumerRecordsList); 
      } catch (DestinationServiceException e) { 
       logger.error("Unable to update destination table"); 
      } 
     } 
    } 
} 

答えて

1

私はあなたがフラッシュしなかったことを推測またはプロデューサを閉じますここseensすることができるものから。

kafka documentation

send()メソッドからの非同期されています。あなたはそれが非同期実行を送信し、ちょうど後に(プロデューサの構成に応じて)送信されたバッチを用意注意してください。呼び出されると、保留中のレコード送信のバッファにレコードが追加され、すぐに返されます。これにより、生産者は効率のために個々の記録をまとめてバッチ処理することができます。 (:なぜあなたは多くのレコードを持っているときかもしれないが、問題を引き起こす全体producerRecordsをキャッシュしているBTW)あなたは試してみてください何

は、すべてのproducerRecordsを繰り返し処理した後producer.close()を呼び出すことです。

それが役に立たない場合は、コンソールの消費者が何が欠けているかを把握する。いくつかのコードを提供してください。プロデューサの設定方法あなたの消費者はどのように見えますか? producerRecordsのタイプは何ですか?

希望に役立ちます。

+0

どのメッセージが欠落していると言うことができますか?最初の1つ?最後のもの?ランダム? – TobiSH

関連する問題