大きなRDDをファイルDStream
として読み込もうとしています。次のように大きなDstreamで自分のRDDのレコードを集計する
コードが見えます:それはlarge..itは私が行う必要がありexception..so投げているので、私がやろうとしています何
val creatingFunc = {() =>
val conf = new SparkConf()
.setMaster("local[10]")
.setAppName("FileStreaming")
.set("spark.streaming.fileStream.minRememberDuration", "2000000h")
.registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable],
classOf[org.apache.hadoop.io.Text], classOf[GGSN]))
val sc = new SparkContext(conf)
// Create a StreamingContext
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
val appFile = httpFileLines
.map(x=> (x._1,x._2.toString()))
.filter(!_._2.contains("ggsnIPAddress"))
.map(x=>(x._1,x._2.split(",")))
var count=0
appFile.foreachRDD(s => {
// s.collect() throw exception due to insufficient amount of emery
//s.count() throw exception due to insufficient amount of memory
s.foreach(x => count = count + 1)
})
println(count)
newContextCreated = true
ssc
}
は私RDD..howeverのカウントを取得することですメモリにデータを収集しないようにする代わりに、foreachの..
..私はその後、私のコード内の方法として、カウントを取得したいが、それは常に0を与えるが、これを行うためにそこに方法は何ですか?
RDDを扱う場合、このようなローカル変数に合計を累積することはできません。 'org.apache.spark.Accumulator'を使うか、' Rdd.count'や 'DStream.count'だけを呼び出すことができます –
' httpFileLines'はどこに作成されていますか? 'RDDか' DStream'ですか? –
dstreamのすべての要素のrdd数またはカウントをカウントしますか? – Knight71