2017-11-19 4 views
0

プロデューサから送信されたデータがコンシューマに届かない理由はわかりません。 私はcloudera仮想マシンに取り組んでいます。 私は、プロデューサーがKafkaを使用し、消費者がスパークストリーミングを使用する単純なプロデューサーコンシューマーを作成しようとしています。カフカとスパークストリーミングシンプルプロデューサコンシューマ

Scalaではプロデューサーコード:Scalaでは

import java.util.Properties 
import org.apache.kafka.clients.producer._ 

object kafkaProducer { 

    def main(args: Array[String]) { 
    val props = new Properties() 
    props.put("bootstrap.servers", "localhost:9092") 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 

    val producer = new KafkaProducer[String, String](props) 

    val TOPIC = "test" 

    for (i <- 1 to 50) { 
     Thread.sleep(1000) //every 1 second 
     val record = new ProducerRecord(TOPIC, generator.getID().toString(),generator.getRandomValue().toString()) 
     producer.send(record) 
    } 

    producer.close() 
    } 
} 

コンシューマーコード:

 val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1)) 

:問題は、消費者のコードの行を変更することで解決され

import java.util 

import org.apache.kafka.clients.consumer.KafkaConsumer 

import scala.collection.JavaConverters._ 
import java.util.Properties 

import kafka.producer._ 

import org.apache.spark.rdd.RDD 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.kafka._ 

object kafkaConsumer { 
     def main(args: Array[String]) { 


     var totalCount = 0L 
     val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AnyName").set("spark.driver.host", "localhost") 
     val ssc = new StreamingContext(sparkConf, Seconds(2)) 
     ssc.checkpoint("checkpoint") 
     val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1)) 

     stream.foreachRDD((rdd: RDD[_], time: Time) => { 
      val count = rdd.count() 
      println("\n-------------------") 
      println("Time: " + time) 
      println("-------------------") 
      println("Received " + count + " events\n") 
      totalCount += count 
     }) 
     ssc.start() 
     Thread.sleep(20 * 1000) 
     ssc.stop() 

     if (totalCount > 0) { 
      println("PASSED") 
     } else { 
      println("FAILED") 
     } 
     } 
} 
+0

あなたはプロデューサーと消費者を順番に開始すると思いますか? – nabongs

+0

はい、コンシューマを起動してから、プロデューサを起動します。 – MennatAllahHany

+0

プロデューサコードをコンソールコンシューマでテストし、コンシューマコードをコンソールプロデューサでテストしましたか? Kafka - Sparkの統合は難しいです... –

答えて

0

2番目のパラメータは、動物園のポートでなければなりません。動物園のポートは、2192で9092でなく、飼い主が接続できますKafkaポート9092が自動的に起動します。

注:カフカは、プロデューサとコンシューマの両方を実行する前にターミナルから起動する必要があります。

関連する問題