2016-07-23 4 views
0

私はアイテムの類似性によってグループ化したいアイテムのセットを含む次のRDDを持っています(同じセット内のアイテムは類似しているとみなされます)。類似性は推移的であり、一つの共通のアイテムも、同様であると考えられる)複数の値を返すようにスパークRDDを減らす

入力RDD:

Set(w1, w2) 
Set(w1, w2, w3, w4) 
Set(w5, w2, w6) 
Set(w7, w8, w9) 
Set(w10, w5, w8) --> All the first 5 set elements are similar as each of the sets have atleast one common item 
Set(w11, w12, w13) 

Iは

Set(w1, w2, w3, w4, w5, w6, w7, w8, w9, w10) 
Set(w11, w12, w13) 

任意suggeに低減するRDD上記希望どのように私はこれを行うことができたのstions?

data.reduce((a,b) => if (a.intersect(b).size > 0) a ++ b ***else (a,b)***) 

ありがとう:私は、彼らがどんな共通の要素が含まれていない場合、私は二組の削減無視することができどこ下記のような何かを行うことができません。

答えて

0

reduceアルゴリズムは実際には間違っています。たとえば、1つのセットが次のセットとマージできない場合でも、コレクション内の別のセットとマージできます。

おそらくもっと良い方法がありますが、これをグラフの問題に変換してGraphxを使用して解決すると思います。

val data = Array(Set("w1", "w2", "w3"), Set("w5", "w6"), Set("w7"), Set("w2", "w3", "w4")) 
val setRdd = sc.parallelize(data).cache 

// Generate an unique id for each item to use as vertex's id in the graph 
val itemToId = setRdd.flatMap(_.toSeq).distinct.zipWithUniqueId.cache 
val idToItem = itemToId.map { case (item, itemId) => (itemId, item) } 

// Convert to a RDD of set of itemId 
val newSetRdd = setRdd.zipWithUniqueId 
    .flatMap { case (sets, setId) => 
    sets.map { item => (item, setId) } 
    }.join(itemToId).values.groupByKey().values 

// Create an RDD containing edges of the graph 
val edgeRdd = newSetRdd.flatMap { set => 
    val seq = set.toSeq 
    val head = seq.head 
    // Add an edge from the first item to each item in a set, 
    // including itself 
    seq.map { item => Edge[Long](head, item)} 
    } 

val graph = Graph.fromEdges(edgeRdd, Nil) 

// Run connected component algorithm to check which items are similar. 
// Items in the same component are similar 
val verticesRDD = graph.connectedComponents().vertices 

verticesRDD.join(idToItem).values.groupByKey.values.collect.foreach(println) 
+0

優れています。ありがとう。 SparkのGraphxライブラリを探検したことはありません。 – soontobeared

関連する問題