2017-06-22 1 views
1

13種類のETL操作を持つシステムでは、Spala 2.xとScalaを併用しています。それらのうち7つは比較的単純で、それぞれが単一のドメインクラスによって駆動され、主にこのクラスと負荷の処理方法のニュアンスが異なります。次のようにこの実施例の目的は、ロードされる7つのピザのトッピングがあることを言うためSpark/Scala、Datasetsおよびケースクラスを使用する多態性

負荷クラスの簡略化されたバージョンは、であり、ここでペパロニだ:すなわち

object LoadPepperoni { 
    def apply(inputFile: Dataset[Row], 
      historicalData: Dataset[Pepperoni], 
      mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = { 
    val sparkSession = SparkSession.builder().getOrCreate() 
    import sparkSession.implicits._ 

    val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row => 
     PepperoniRaw(
      weight = row.getAs[String]("weight"), 
      cost = row.getAs[String]("cost") 
     ) 
    }.toDS() 

    val validatedData: Dataset[PepperoniRaw] = ??? // validate the data 

    val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data 

    val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw => 
     Pepperoni(value = ???, key1 = ???, key2 = ???) 
    }.toDS() 

    val joinedData = dedupedData.joinWith(historicalData, 
     historicalData.col("key1") === dedupedData.col("key1") && 
     historicalData.col("key2") === dedupedData.col("key2"), 
     "right_outer" 
    ) 

    joinedData.map { case (hist, delta) => 
     if(/* some condition */) { 
     hist.copy(value = /* some transformation */) 
     } 
    }.flatMap(list => list).toDS() 
    } 
} 

クラスは、一連の実行します操作はほとんど同じで、常に同じ順序ですが、「生」から「ドメイン」へのマッピングやマージ機能のように、トッピングごとにわずかに異なる場合があります。

7つのトッピング(マッシュルーム、チーズなど)でこれを行うには、構造とロジックがすべての負荷に共通であるため、単にクラスをコピー/ペーストしてすべての名前を変更しないでください。持っている、

object Load { 
    def apply[R,D](inputFile: Dataset[Row], 
      historicalData: Dataset[D], 
      mergeFun: (D, R) => D): Dataset[D] = { 
    val sparkSession = SparkSession.builder().getOrCreate() 
    import sparkSession.implicits._ 

    val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row => 
... 

そして、このような「ドメイン」と「生」、またはマージからのマッピングとして、各クラス固有の動作用:代わりに、私はむしろ、このように、ジェネリック型とジェネリック「負荷」クラスを定義したいです特性を実装する抽象クラスなどがあります。これは典型的な依存性注入/多型パターンである。

しかし、私はいくつかの問題を抱えています。 Spark 2.xでは、エンコーダはネイティブタイプおよびケースクラスに対してのみ提供され、クラスをケースクラスとして一般的に識別する方法はありません。したがって、推論されたtoDS()やその他の暗黙的な機能は、ジェネリック型を使用しているときは利用できません。

また、this related question of mineに記載されているように、ジェネリックスを使用する場合にはケースクラスcopyメソッドは使用できません。

私はScalaやHaskellのような型クラスやアドホック多型などの他のデザインパターンを検討しましたが、Spark Datasetは基本的には抽象的に定義できないケースクラスのみで動作します。

これはSparkシステムの一般的な問題ですが、解決策を見つけることができません。どんな助けもありがたい。

答えて

2

.toDSを可能に暗黙的な変換は、次のとおりです。(https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLImplicitsから)

implicit def rddToDatasetHolder[T](rdd: RDD[T])(implicit arg0: Encoder[T]): DatasetHolder[T] 

あなたは正確にEncoder[T]のスコープには暗黙の値は、あなたの適用方法を作ったということになりましたがないということで修正されています一般的なので、この変換は起こりません。しかし、単純に暗黙のパラメータとして受け入れることができます!

object Load { 
    def apply[R,D](inputFile: Dataset[Row], 
      historicalData: Dataset[D], 
      mergeFun: (D, R) => D)(implicit enc: Encoder[D]): Dataset[D] = { 
... 

はその後コール負荷時に、特定のタイプと、そのタイプのエンコーダを見つけることができるはずです。呼び出しコンテキストではimport sparkSession.implicits._にする必要があることに注意してください。

編集:暗黙のnewProductEncoder[T <: Product](implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[T]): Encoder[T]は、タイプ(apply[R, D <: Product])を区切り、暗黙的にJavaUniverse.TypeTag[D]をパラメータとして受け入れることができるようにすることです。

+0

私はそれを試してみましょう心のブロアーのようなものですありがとうございました。私は同様のエラーaggregateByKey()と同様にコピー()(私のポストの関連する質問のリンクを参照してください)、それの範囲に必要な実装をもたらすことができる同様の魔法はありますか? –

+0

より一般的には、欠落している特定の暗黙的なものを追跡し、そのタイプが完全に指定されているコールサイトまでパラメータにすることです。 aggregateByKeyの場合は、ClassTag [D](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions )。私はあなたの質問を個別に 'コピ​​ー'について見て、良いアイデアがあれば返信します –

関連する問題