2017-07-17 3 views
1

スパークストリーミングアプリケーションは、driver's stdoutに簡単なステートメントを出力していません。ここでは、dstream_2を変換した直後にいくつかのステートメントを印刷しようとしていますが、 。バッチ実行ごとに印刷されることを期待していました。プログラムが最初に評価されたときに、その位置にSparkストリーミングアプリケーションでは印刷できません

val sparkConf = new SparkConf().setMaster("yarn-cluster") 
           .setAppName("SparkJob") 
           .set("spark.executor.memory","2G") 
           .set("spark.dynamicAllocation.executorIdleTimeout","5") 


val streamingContext = new StreamingContext(sparkConf, Minutes(1)) 

var historyRdd: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD 

var historyRdd_2: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD 


val stream_1 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_1)) 
val dstream_2 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_2)) 


val dstream_2 = stream_2.map((r: Tuple2[String, GenericData.Record]) => 
{ 
    //some mapping 
} 
//Not Working 
print("Printing Test") 
val historyDStream = dstream_1.transform(rdd => rdd.union(historyRdd)) 
dstream_2.foreachRDD(r => r.repartition(500)) 
val historyDStream_2 = dstream_2.transform(rdd => rdd.union(historyRdd_2)) 
val fullJoinResult = historyDStream.fullOuterJoin(historyDStream_2) 

val filtered = fullJoinResult.filter(r => r._2._1.isEmpty) 


filtered.foreachRDD{rdd => 

    val formatted = rdd.map(r => (r._1 , r._2._2.get)) 

    historyRdd_2.unpersist(false) // unpersist the 'old' history RDD 
    historyRdd_2 = formatted // assign the new history 
    historyRdd_2.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation 
} 


val filteredStream = fullJoinResult.filter(r => r._2._2.isEmpty) 


filteredStream.foreachRDD{rdd => 
    val formatted = rdd.map(r => (r._1 , r._2._1.get)) 
    historyRdd.unpersist(false) // unpersist the 'old' history RDD 
    historyRdd = formatted // assign the new history 
    historyRdd.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation 
} 
streamingContext.start() 
streamingContext.awaitTermination() 

}}

答えて

1

print("Printing Test")は一度だけ印刷されます。 各バッチ間隔にいくつかのコンソール出力を追加するために、我々は出力動作の範囲にI/O操作を配置する必要があります。

これは毎回印刷されます:

dstream2.foreachRDD{ _ -> print("Printing Test") } 
+0

がために、この同じですロギング(slf4j)? – JSR29

+0

この印刷動作の理由も指定できます。 – JSR29

+0

ログ・ステートメントで同じである必要があります。再動作:sparkストリーミングはdstreamで動作します。 dstream操作の範囲外のものは、プログラム内の通常のコードとして評価されます。理解しなければならないことは、dstreamの操作はプログラム内でのみ宣言されるということです。実際の実行は、スパークストリーミングスケジューラで行われます。 – maasg

関連する問題