2
私は時系列データを扱う際に問題があります。電力障害のために、データセットにタイムスタンプがないものがあります。私は行を追加してこのギャップを埋める必要があり、その後、欠損値を補間することができます。timeseriesの空白を埋めるSpark
入力データ:
periodstart usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039
募集出力:
periodstart usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 02:30 0
2015-09-11 02:45 0
2015-09-11 03:00 0
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039
は、今私は、データセットのforeach関数内でwhileループでこれを修正しました。問題は、whileループを実行する前にまずデータセットをドライバに収集する必要があることです。だから、それはSparkにとって正しい方法ではありません。
誰かが私に良い解決策を与えることができますか?
これは私のコードです:
MissingMeasurementsDS.collect().foreach(row => {
// empty list for new generated measurements
val output = ListBuffer.empty[Measurement]
// Missing measurements
val missingMeasurements = row.getAs[Int]("missingmeasurements")
val lastTimestamp = row.getAs[Timestamp]("previousperiodstart")
//Generate missing timestamps
var i = 1
while (i <= missingMeasurements) {
//Increment timestamp with 15 minutes (900000 milliseconds)
val newTimestamp = lastTimestamp.getTime + (900000 * i)
output += Measurement(new Timestamp(newTimestamp), 0))
i += 1
}
//Join interpolated measurements with correct measurements
completeMeasurementsDS.join(output.toDS())
})
completeMeasurementsDS.show()
println("OutputDF count = " + completeMeasurementsDS.count())
RHeutzあなたがここに欠損値を追加するためのコードスニペットを貼り付けることはできますか? –