データを外部システムに送信できる強力なプリミティブであるdstream.foreachRDD
を参照してください。以下は
Design Patterns for using foreachRDD
(が最適化されていない、ただPOCのために、KafkaProducerオブジェクトはforeachRDDで再利用することができる)あなたの参照のための私のカフカ統合コードです:私はどのように疑問に思って
DStream.foreachRDD(rdd => {
rdd.foreachPartition { partitionOfRecords =>
val kafkaProps = new Properties()
kafkaProps.put("bootstrap.servers", props("bootstrap.servers"))
kafkaProps.put("client.id", "KafkaIntegration Producer");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
val producer = new KafkaProducer[String, String](kafkaProps);
partitionOfRecords.foreach(record => {
val message = new ProducerRecord[String, String]("hdfs_log_test", record.asInstanceOf[String])
producer.send(message)
})
producer.close()
}
})