2016-08-31 6 views
0

私はscalaとsparkが初めてです。私は少し問題があります。私は以下のスキーマを持つRDDを持っています。ScalaのSparkのRDDとcase(Schema)のマッピング

RDD[((String, String), (Int, Timestamp, String, Int))] 

と私は、この

RDD[(Int, String, String, String, Timestamp, Int)] 

のようにそれを変換するために、このRDDをマッピングする必要があると私は、この

map { case ((pid, name), (id, date, code, level)) => (id, name, code, pid, date, level) } 

この作品の罰金のためのコードを、以下の記述します。今、私は別のRDD

RDD[((String, String), List[(Int, Timestamp, String, Int)])] 

を持っていると私は、私はこのコードを試してみましたことをどのように行うことができます

RDD[(Int, String, String, String, Timestamp, Int)] 

上記のように、このようにそれを変換したいが、それは

map { 
    case ((pid, name), List(id, date, code, level)) => (id, name, code, pid, date, level) 
} 
を動作しません。

どのように達成できますか?

+0

を試してみてください "しかし、それは動作しません"。どのように動作しないかを説明してください。 –

+0

これは 'RDD [(Int、Timestamp、String、Int、String)、String、String、(Int、Timestamp、String、Int、String)、(Int、Timestamp、String、Int、String)、 、Timestamp、String、Int、String))] ' –

答えて

1

これはお探しのものですか?

val input: RDD[((String, String), List[(Int, Timestamp, String, Int)])] = ... 
val output: RDD[(Int, String, String, String, Timestamp, Int)] = input.flatMap { case ((pid, name), list) => 
    list.map { case (id, date, code, level) => 
    (id, name, code, pid, date, level) 
    } 
} 

や理解のために使用した:

val output: RDD[(Int, String, String, String, Timestamp, Int)] = for { 
    ((pid, name), list)  <- input 
    (id, date, code, level) <- list 
} yield (id, name, code, pid, date, level) 
+0

あります。 Thanx @PawełJurczenko –

0

map { 
    case ((id, name), list) => (id, name, list.flatten) 
} 
+1

こんにちは。あなたのコードは正しいかもしれませんが、いくつかの文脈ではより良い答えになります。たとえば、この提案された変更がどのようにしてなぜ問題の問題を解決するのか、そしておそらく関連文書へのリンクを含む理由を説明することができます。そうすれば、彼らにとってより有用になり、同様の問題に対する解決策を探している他のサイトの読者にとってもより有用になります。 –

関連する問題