2017-01-04 3 views
1

空のパーティションをmapPartitionsWithIndexでどのように扱うべきでしょうか?spark mapPartitionsWithIndex空のパーティションを扱う

完全な例を見つけることができます:https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2

私の目標は、Spark/Scala: fill nan with last good observationの改善としてRDDを経由して最後の良好な既知の値とNaN値を満たすことです。

しかし、いくつかのパーティションには、任意の値が含まれていない:それはクラッシュします値を入力する

val toCarryBd = spark.sparkContext.broadcast(toCarry) 
def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = { 
    if (iter.isEmpty) { 
     iter 
    } else { 
     var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get 
     iter.map(foo => { 
     println("original ", foo) 
     if (!notMissing(Some(foo))) { 
      println("replaced") 
      // this will go into the default case 
      // FooBar(lastNotNullRow.getOrElse(FooBar(Option(Date.valueOf("2016-01-01")), "DUMMY")).foo, foo.bar) 
      FooBar(lastNotNullRow.get.foo, foo.bar) // TODO warning this throws an error 
     } else { 
      lastNotNullRow = Some(foo) 
      foo 
     } 
     }) 
    } 
    } 

    val imputed: RDD[FooBar] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) } 

を使用する場合

###################### carry 
Map(2 -> None, 5 -> None, 4 -> None, 7 -> Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 -> Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)), 6 -> None, 0 -> None) 
(2,None) 
(5,None) 
(4,None) 
(7,Some(FooBar(2016-01-04,lastAssumingSameDate))) 
(1,Some(FooBar(2016-01-01,first))) 
(3,Some(FooBar(2016-01-02,second))) 
(6,None) 
(0,None) 
() 
###################### carry 

case class FooBar(foo: Option[Date], bar: String) 
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), 
    ("2016-wrongFormat", "noValidFormat"), 
    ("2016-01-04", "lastAssumingSameDate")) 
    .toDF("foo", "bar") 
    .withColumn("foo", 'foo.cast("Date")) 
    .as[FooBar] 
def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined 
myDf.rdd.filter(x => notMissing(Some(x))).count 
val toCarry: Map[Int, Option[FooBar]] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(x => notMissing(Some(x))).toSeq.lastOption)) }.collectAsMap 

編集

出力から回答を入力すると出力されます。それでも限りmapPartitions(および類似の)作業時に空のパーティションを扱うよう

+----------+--------------------+ 
|  foo|     bar| 
+----------+--------------------+ 
|2016-01-01|    first| 
|2016-01-02|    second| 
|2016-01-04|  noValidFormat| 
|2016-01-04|lastAssumingSameDate| 
+----------+--------------------+ 

答えて

1

がない100%、一般的なアプローチは、あなたが空の入力イテレータを持っているときに、正しい型の空のイテレータを返すことです。

あなたのコードがこれをやっているようですが、アプリケーションロジックにバグがあるようです(つまり、パーティションにレコードに値がない場合、前の行が前のパーティションが空ではなく、良好な行を持っている(必ずしもそうである必要はない)。この問題は、前回の正常な値を収集している各パーティションを通過することで部分的に解決されました。そして、パーティションの先頭に値がないと、収集されたアレイの値をルックアップします。

ただし、これも前のパーティションが空であると同時に発生する場合は、探しているものが見つかるまで前のパーティションの値を参照して検索する必要があります。 (これは、データセットの最初のレコードが有効であることを前提としています。

あなたのソリューションは本当に動作していますが、必ずしも必ずしも成立しないというマイナーな前提があります。

+0

素晴らしいコメントをいただきありがとうございます。これは、最後の良い既知の値ではなく、次のもので満たすのに役立ちます。 –

+0

さて、最後に知られている良いことをするために、逆方向に検索する必要があります。 – Holden

+0

あなたは+1 a -1の代わりにここを意味します: 'while(lastNotNullRow == None){ lastNotNullRow = toCarryBd.value.get(i + 1).get }'?しかし、最初のパーティションが空の場合(この場合のように)、代替マップがすでに正しい順序になっていると思います。 –

関連する問題