2
私はspark経由でHDFSに保存したいflumeストリームを持っています。以下は、私は私のspsarkストリーミングジョブを起動すると、私はFlume + Spark - HDFSにDStreamを保存
object FlumePull {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(
"Usage: FlumePollingEventCount <host> <port>")
System.exit(1)
}
val batchInterval = Milliseconds(60000)
val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
val ssc = new StreamingContext(sparkConf, batchInterval)
val stream = FlumeUtils.createPollingStream(ssc, "localhost", 9999)
stream.map(x => x + "!!!!")
.saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout")
ssc.start()
ssc.awaitTermination()
}
}
を実行しています火花コードがあり、それは、HDFSに保存出力を行いますが、出力は次のようなものです:
[[email protected] ~]# hadoop fs -cat /user/root/spark/flume_Map_-1459450380000._Mapout/part-00000
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
それは代わりの水路イベントを記憶していますFlumeからのデータどのようにデータを取り出すのですか?あなたはSparkFlumeEvent
からの根本的なバッファーを抽出し、それを保存する必要が
おかげ