2016-10-31 6 views
2

カフカトピックに「このプログラムは実行中です」という文字列を送信するカフカプロデューサを作成しようとしています。なぜそれが動作していないのか分かりません。以下は、以下のコードです。私はCloudera分布ではない。カフカプロデューサ送信文字列が正しくありません

package kafka_test; 

import java.util.Properties; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.serialization.StringSerializer; 

public class DataMovement { 

    public static void main(String[] args) { 

     String kafkaTopic = args[0]; 

     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server:9092"); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 

     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); 
     ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(kafkaTopic, null, "This program is running."); 

     producer.send(producerRecord); 
     producer.close(); 

    } 
} 

私は、エラーメッセージが、タイムアウト得ることはありません:それはまた、カフカに関する多くの情報を出力し

を、SSL、passowrd、client.idなど

10分の16/31 10:25:46情報utils.AppInfoParser:Kafkaバージョン:0.9.0.1 16/10/31 10:25:46情報utils.AppInfoParser:Kafka commitId:コミットID 16/10/31 10:26:46 INFO producer .KafkaProducer:timeoutMillis = 9223372036854775807ミリ秒でカフカプロデューサーを閉じる。

+0

例外はありますか?また、プロデューサを閉じる前に、Thread.sleep(2000)を使用してしばらく待つことができます。プロデューサが閉じられると、kafkaはメッセージをtopicに送信しません。 – Shankar

+0

また、メッセージキーを設定していない場合は、トピック名とメッセージの2つの引数を指定してProducerRecordを使用できます。 – Shankar

+0

タイムアウトエラーです。私は編集をしました。大丈夫、鍵を変えてみましょう。私は寝てみましょう。 – Defcon

答えて

0

コードは正常に動作します。サーバが問題になりました:9092は、カフカクラスタの指定されたブローカのアドレスでなければなりません(アクティブブローカをターゲットにしていました。

+0

あなたは明確にすることができますか?アクティブなワーカーはブートストラップサーバーとして動作する必要があります。必要に応じてMetadataResponseを使用してクライアントをリダイレクトするだけで別のサーバーと通信します。 –

+0

2つのブローカーがあり、それぞれに独自のノードがあります。何らかの理由で、それらのいずれかに指示すると、メッセージは入力されません(例としてこのbroker_node_2を呼び出すことができます)。私たちがsendデータをbroker_node_1に向けたとき、それはうまく動作しました。理由は不明です。クラスタが設定された方法かもしれません。 – Defcon

関連する問題