import org.apache.spark.graphx._
val g = Graph(sc.makeRDD(Array((1L,""),(2L,""),(3L,""))),
sc.makeRDD(Array(Edge(1L,2L,0),Edge(1L,3L,0))))
val ids = g.vertices.map(_._1).cache
Graph(g.vertices, ids.cartesian(ids).filter(x => x._1 < x._2)
.map(x => Edge(x._1,x._2,0)))
.outerJoinVertices(g.collectNeighborIds(EdgeDirection.Either))(
(_,_,u) => u.get.toSet)
.mapTriplets(et => ((et.srcAttr | et.dstAttr) &~
(et.srcAttr & et.dstAttr)).size)
.triplets
.map(et => (et.srcId, et.dstId, et.attr))
.collect
This short piece of code pulls a number of tricks. First is the overall strategy. The goal is to identify the symmetric difference size for every possible pair of vertices in the graph. This suggests that we need to do a Cartesian product to obtain all possible pairs of vertices. But rather than just getting the Cartesian product and doing an RDD map() directly off that, we instead create a whole new Graph where the edges are that Cartesian product. The reason is so that we can leverage outerJoinVertices() and glom on the set of nearest neighbors using collectNeighborIds()(which returns a VertexRDD, suitable for outerJoinVertices()).
And collectNeighborIds() itself is a powerful function that didn't get covered in my book. It's a convenient way to, for each vertex, gather the vertex Ids of all the neighbor vertices.
Finally, to compute the symmetry difference we use Scala Set operations, as the symmetry difference is defined as:
A Δ B = (A ∪ B) - (A ∩ B)
Note in Scala the set difference operator is &~ rather than -.
No comments:
Post a Comment