0
プロデューサから送信されたデータがコンシューマに届かない理由はわかりません。 私はcloudera仮想マシンに取り組んでいます。 私は、プロデューサーがKafkaを使用し、消費者がスパークストリーミングを使用する単純なプロデューサーコンシューマーを作成しようとしています。カフカとスパークストリーミングシンプルプロデューサコンシューマ
Scalaではプロデューサーコード:Scalaでは
import java.util.Properties
import org.apache.kafka.clients.producer._
object kafkaProducer {
def main(args: Array[String]) {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val TOPIC = "test"
for (i <- 1 to 50) {
Thread.sleep(1000) //every 1 second
val record = new ProducerRecord(TOPIC, generator.getID().toString(),generator.getRandomValue().toString())
producer.send(record)
}
producer.close()
}
}
コンシューマーコード:
val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1))
:問題は、消費者のコードの行を変更することで解決され
import java.util
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
import java.util.Properties
import kafka.producer._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
object kafkaConsumer {
def main(args: Array[String]) {
var totalCount = 0L
val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AnyName").set("spark.driver.host", "localhost")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1))
stream.foreachRDD((rdd: RDD[_], time: Time) => {
val count = rdd.count()
println("\n-------------------")
println("Time: " + time)
println("-------------------")
println("Received " + count + " events\n")
totalCount += count
})
ssc.start()
Thread.sleep(20 * 1000)
ssc.stop()
if (totalCount > 0) {
println("PASSED")
} else {
println("FAILED")
}
}
}
あなたはプロデューサーと消費者を順番に開始すると思いますか? – nabongs
はい、コンシューマを起動してから、プロデューサを起動します。 – MennatAllahHany
プロデューサコードをコンソールコンシューマでテストし、コンシューマコードをコンソールプロデューサでテストしましたか? Kafka - Sparkの統合は難しいです... –