2016-04-11 14 views
0

私が持っているマルチRDDSスパークScalaはキー

genderRDD

(1713926427,{gender={f=3327, m=1945, unknown=897}}) 

そして

actionRDD

(1713926427,{actionType={repost=2927, comment=2345, like=897}}) 

そして

01でマルチRDDをマージ

(1713926427,{gender={f=3327, m=1945, unknown=897},actionType={repost=2927, comment=2345, like=897},device={iphone=2999, android=12321}}) 

をだから私はそれをデータベースに保存することができ:

(1713926427,{deviceType={iphone=2999, android=12321}}) 

は、彼らは私が生成する必要がありますIDで2 RDDSをマージしたいフォーマットRDD(id, HashMap[String, HashMap[String, Integer]])

です。典型的な方法は?

答えて

0

私は{deviceType={iphone=2999, android=12321}}が、その場合には、Map[String, Map[String, Int]]を表していると仮定している - あなたは、単に結果を「フラット化」し、その後いくつかの簡単なマッピングをjoin Sを使用してください。

// some sample data: 
val rdd1 = sc.parallelize(Seq((1713926427, Map("gender" -> Map("f" -> 3327, "m" ->1945, "unknown" -> 897))))) 
val rdd2 = sc.parallelize(Seq((1713926427, Map("actionType" -> Map("repost" -> 2927, "comment" -> 2345, "like" -> 897))))) 
val rdd3 = sc.parallelize(Seq((1713926427, Map("deviceType" -> Map("iphone" -> 2999, "android" -> 12321))))) 

// join all three RDDs and map to flatten the value: 
val result = rdd1 
    .join(rdd2) 
    .join(rdd3) 
    .map { case (id, ((gender, action), device)) => (id, (gender ++ action ++ device)) } 

// result has type RDD[(Int, Map[String, Map[String, Int]])] 
result.foreach(println) 
// prints: 
// (1713926427,Map(gender -> Map(f -> 3327, m -> 1945, unknown -> 897), actionType -> Map(repost -> 2927, comment -> 2345, like -> 897), deviceType -> Map(iphone -> 2999, android -> 12321))) 
関連する問題