2016-11-27 15 views
4

Spark Streamingを使用して、カフカのトピックからレコードを読み取ろうとしています。Spark Streamingの最初からKafkaトピックからレコードを読み取る方法は?

これは私のコードです:

object KafkaConsumer { 

    import ApplicationContext._ 

    def main(args: Array[String]) = { 

    val kafkaParams = Map[String, Object](
     "bootstrap.servers" -> "localhost:9092", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> s"${UUID.randomUUID().toString}", 
     "auto.offset.reset" -> "earliest", 
     "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 

    val topics = Array("pressure") 
    val stream = KafkaUtils.createDirectStream[String, String](
     streamingContext, 
     PreferConsistent, 
     Subscribe[String, String](topics, kafkaParams) 
    ) 
    stream.print() 
    stream.map(record => (record.key, record.value)).count().print() 
    streamingContext.start() 
    } 
} 

私はこれを実行したときにそれは何も表示されません。

は、データが pressureトピックに実際に存在するかどうかを確認するために、私は、コマンドラインのアプローチを使用し、それが表示記録を行います。

bin/kafka-console-consumer.sh \ 
    --bootstrap-server localhost:9092 \ 
    --topic pressure \ 
    --from-beginning 

出力:

TimeStamp:07/13/16 15:20:45:226769,{'Pressure':'834'} 
TimeStamp:07/13/16 15:20:45:266287,{'Pressure':'855'} 
TimeStamp:07/13/16 15:20:45:305694,{'Pressure':'837'} 

何が悪いのでしょうか?

答えて

6

streamingContext.awaitTermination()が見つかりません。

-2

streamingContextを開始し、最後にstreamingContext.awaitTermination()を開始する必要があります。

関連する問題