2017-01-19 3 views
1

私はKafka v0.10.0.0を使用しており、プロデューサー&コンシューマーJavaコードを作成しています。しかし、コードはproducer.sendでログに例外なく止まっています。カフカプロデューサーコンシューマーAPIの問題

誰でも助けてください。少し早いですがお礼を。

"mapr-kakfaサンプルプログラム"を使用しています。ここで完全なコードを見ることができます。 https://github.com/panwars87/kafka-sample-programs

**重要:maven依存関係ではkafka-clientのバージョンを0.10.0.0に変更し、ローカルではKafka 0.10.0.0を実行しました。

public class Producer { 
public static void main(String[] args) throws IOException { 
    // set up the producer 
    KafkaProducer<String, String> producer; 
    System.out.println("Starting Producers...."); 
    try (InputStream props = Resources.getResource("producer.props").openStream()) { 
     Properties properties = new Properties(); 
     properties.load(props); 
     producer = new KafkaProducer<>(properties); 
     System.out.println("Property loaded successfully ...."); 
    } 

    try { 
     for (int i = 0; i < 20; i++) { 
      // send lots of messages 
      System.out.println("Sending record one by one...."); 
      producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message.")); 

      System.out.println(i+" message sent...."); 
      // every so often send to a different topic 
      if (i % 2 == 0) { 
       producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message.")); 
       producer.send(new ProducerRecord<String, String>("summary-markers","sending message - "+i+" to summary-markers.")); 
       producer.flush(); 
       System.out.println("Sent msg number " + i); 
      } 
     } 
    } catch (Throwable throwable) { 
     System.out.printf("%s", throwable.getStackTrace()); 
     throwable.printStackTrace(); 
    } finally { 
     producer.close(); 
    } 

    } 
} 

public class Consumer { 
public static void main(String[] args) throws IOException { 

    // and the consumer 
    KafkaConsumer<String, String> consumer; 
    try (InputStream props = Resources.getResource("consumer.props").openStream()) { 
     Properties properties = new Properties(); 
     properties.load(props); 
     if (properties.getProperty("group.id") == null) { 
      properties.setProperty("group.id", "group-" + new Random().nextInt(100000)); 
     } 
     consumer = new KafkaConsumer<>(properties); 
    } 
    consumer.subscribe(Arrays.asList("fast-messages", "summary-markers")); 
    int timeouts = 0; 
    //noinspection InfiniteLoopStatement 
    while (true) { 
     // read records with a short timeout. If we time out, we don't really care. 
     ConsumerRecords<String, String> records = consumer.poll(200); 
     if (records.count() == 0) { 
      timeouts++; 
     } else { 
      System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts); 
      timeouts = 0; 
     } 
     for (ConsumerRecord<String, String> record : records) { 
      switch (record.topic()) { 
       case "fast-messages": 
        System.out.println("Record value for fast-messages is :"+ record.value());    
         break; 
     case "summary-markers": 
      System.out.println("Record value for summary-markers is :"+ record.value()); 
         break; 
       default: 
        throw new IllegalStateException("Shouldn't be possible to get message on topic "); 
      } 
     } 
    } 
    } 
} 
+0

そこに行くたくさんあります - あなたが作り出す小さな何かにこれを減らすことができ、コンフィギュレーションのロードに、複数のトピックに複数のメッセージを送信したループ、フラッシュの呼び出しなどがバグや詳細を教えてください - 正確にどこが「立ち往生しますか」?最初の送信は成功しますか?第二?あなたはそれが固まっていることをどのように知っていますか?あなたは仕事を送るかどうかを見るためにログを追加しましたか? –

+0

あなたは、プロデューサーが立ち往生しているが、消費者向けのコードを貼り付けたと述べましたか? – amethystic

+0

プロデューサーとコンシューマーの両方を追加しました。 – PanwarS87

答えて

0

実行しているコードは、カフカではないmapRのデモ用です。 MapRはKafka 0.9とAPIの互換性を主張していますが、mapRはKafkaとは異なるメッセージオフセットを扱います(オフセットはインクリメンタルオフセットではなくメッセージのバイトオフセットです)。mapRの実装も非常に異なります。これは、あなたが運が良ければ、Kafka 0.9アプリがmapRで実行されている可能性があり、逆も同様であることを意味します。他のリリースの保証はありません。