2016-04-01 18 views
2

私はspark経由でHDFSに保存したいflumeストリームを持っています。以下は、私は私のspsarkストリーミングジョブを起動すると、私はFlume + Spark - HDFSにDStreamを保存

object FlumePull { 
    def main(args: Array[String]) { 
    if (args.length < 2) { 
     System.err.println(
     "Usage: FlumePollingEventCount <host> <port>") 
     System.exit(1) 
    } 

    val batchInterval = Milliseconds(60000) 
    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount") 
    val ssc = new StreamingContext(sparkConf, batchInterval) 
    val stream = FlumeUtils.createPollingStream(ssc, "localhost", 9999) 

    stream.map(x => x + "!!!!") 
      .saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout") 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

を実行しています火花コードがあり、それは、HDFSに保存出力を行いますが、出力は次のようなものです:

[[email protected] ~]# hadoop fs -cat /user/root/spark/flume_Map_-1459450380000._Mapout/part-00000 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 

それは代わりの水路イベントを記憶していますFlumeからのデータどのようにデータを取り出すのですか?あなたはSparkFlumeEventからの根本的なバッファーを抽出し、それを保存する必要が

おかげ

答えて

0

。たとえば、あなたのイベントの本文がStringの場合:

stream.map(x => new String(x.event.getBody.array) + "!!!!") 
     .saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout") 
関連する問題