2017-02-23 11 views
2

私は時系列データを扱う際に問題があります。電力障害のために、データセットにタイムスタンプがないものがあります。私は行を追加してこのギャップを埋める必要があり、その後、欠損値を補間することができます。timeseriesの空白を埋めるSpark

入力データ:

periodstart    usage 
--------------------------------- 
2015-09-11 02:15   23000 
2015-09-11 03:15   23344 
2015-09-11 03:30   23283 
2015-09-11 03:45   23786 
2015-09-11 04:00   25039 

募集出力:

periodstart    usage 
--------------------------------- 
2015-09-11 02:15   23000 
2015-09-11 02:30   0 
2015-09-11 02:45   0 
2015-09-11 03:00   0 
2015-09-11 03:15   23344 
2015-09-11 03:30   23283 
2015-09-11 03:45   23786 
2015-09-11 04:00   25039 

は、今私は、データセットのforeach関数内でwhileループでこれを修正しました。問題は、whileループを実行する前にまずデータセットをドライバに収集する必要があることです。だから、それはSparkにとって正しい方法ではありません。

誰かが私に良い解決策を与えることができますか?

これは私のコードです:

MissingMeasurementsDS.collect().foreach(row => { 
    // empty list for new generated measurements 
    val output = ListBuffer.empty[Measurement] 
    // Missing measurements 
    val missingMeasurements = row.getAs[Int]("missingmeasurements") 
    val lastTimestamp = row.getAs[Timestamp]("previousperiodstart") 
    //Generate missing timestamps 
    var i = 1 
    while (i <= missingMeasurements) { 
    //Increment timestamp with 15 minutes (900000 milliseconds) 
    val newTimestamp = lastTimestamp.getTime + (900000 * i) 
    output += Measurement(new Timestamp(newTimestamp), 0)) 
    i += 1 
    } 
    //Join interpolated measurements with correct measurements 
    completeMeasurementsDS.join(output.toDS()) 
}) 
completeMeasurementsDS.show() 
println("OutputDF count = " + completeMeasurementsDS.count()) 
+0

RHeutzあなたがここに欠損値を追加するためのコードスニペットを貼り付けることはできますか? –

答えて

3

入力DataFrameは、以下の構造を有している場合:

root 
|-- periodstart: timestamp (nullable = true) 
|-- usage: long (nullable = true) 

は、最小/最大を決定します。

15分間例えばステップを設定し
val (minp, maxp) = df 
    .select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint"))) 
    .as[(Long, Long)] 
    .first 

val step: Long = 15 * 60 

は基準範囲を生成します。

val reference = spark 
    .range((minp/step) * step, ((maxp/step) + 1) * step, step) 
    .select($"id".cast("timestamp").alias("periodstart")) 

はギャップに参加して、塗りつぶし:

reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage")) 
関連する問題