2016-08-12 9 views
2

大きなRDDをファイルDStreamとして読み込もうとしています。次のように大きなDstreamで自分のRDDのレコードを集計する

コードが見えます:それはlarge..itは私が行う必要がありexception..so投げているので、私がやろうとしています何

val creatingFunc = {() => 
    val conf = new SparkConf() 
       .setMaster("local[10]") 
       .setAppName("FileStreaming") 
       .set("spark.streaming.fileStream.minRememberDuration", "2000000h") 
       .registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable], 
classOf[org.apache.hadoop.io.Text], classOf[GGSN])) 

    val sc = new SparkContext(conf) 

    // Create a StreamingContext 
    val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds)) 

    val appFile = httpFileLines 
        .map(x=> (x._1,x._2.toString())) 
        .filter(!_._2.contains("ggsnIPAddress")) 
        .map(x=>(x._1,x._2.split(","))) 

    var count=0 

    appFile.foreachRDD(s => { 
    // s.collect() throw exception due to insufficient amount of emery 
    //s.count() throw exception due to insufficient amount of memory 
    s.foreach(x => count = count + 1) 
    }) 

    println(count) 
    newContextCreated = true 
    ssc 
} 

は私RDD..howeverのカウントを取得することですメモリにデータを収集しないようにする代わりに、foreachの..

..私はその後、私のコード内の方法として、カウントを取得したいが、それは常に0を与えるが、これを行うためにそこに方法は何ですか?

+0

RDDを扱う場合、このようなローカル変数に合計を累積することはできません。 'org.apache.spark.Accumulator'を使うか、' Rdd.count'や 'DStream.count'だけを呼び出すことができます –

+0

' httpFileLines'はどこに作成されていますか? 'RDDか' DStream'ですか? –

+0

dstreamのすべての要素のrdd数またはカウントをカウントしますか? – Knight71

答えて

0

foreachRDDに電話する必要はありません。電話番号countです。それはまだメモリ例外の不十分な量が得られた場合のいずれかが毎回データの小さなバッチを計算する、またはあなたの負荷を処理するワーカーノードを拡大する必要があり、

val appFile = httpFileLines 
       .map(x => (x._1, x._2.toString())) 
       .filter(!_._2.contains("ggsnIPAddress")) 
       .map(x => (x._1, x._2.split(","))) 

val count = appFile.count() 

:あなたはDStreamに定義されたcountメソッドを使用することができます。

+0

それはDStreamのすべての要素の数を返しません、私はまだforeachを行う必要があります.. – Luckylukee

関連する問題