Friday, March 11, 2016

Symmetric Difference in GraphX

A question was posed over at the online forums for my book about how to implement symmetry difference in GraphX. The answer is the code below.

import org.apache.spark.graphx._
  
val g = Graph(sc.makeRDD(Array((1L,""),(2L,""),(3L,""))),
              sc.makeRDD(Array(Edge(1L,2L,0),Edge(1L,3L,0))))
  
val ids = g.vertices.map(_._1).cache
  
Graph(g.vertices, ids.cartesian(ids).filter(x => x._1 < x._2)
       .map(x => Edge(x._1,x._2,0)))
  .outerJoinVertices(g.collectNeighborIds(EdgeDirection.Either))(
     (_,_,u) => u.get.toSet)
  .mapTriplets(et => ((et.srcAttr | et.dstAttr) &~
                      (et.srcAttr & et.dstAttr)).size)
  .triplets
  .map(et => (et.srcId, et.dstId, et.attr))
  .collect

This short piece of code pulls a number of tricks. First is the overall strategy. The goal is to identify the symmetric difference size for every possible pair of vertices in the graph. This suggests that we need to do a Cartesian product to obtain all possible pairs of vertices. But rather than just getting the Cartesian product and doing an RDD map() directly off that, we instead create a whole new Graph where the edges are that Cartesian product. The reason is so that we can leverage outerJoinVertices() and glom on the set of nearest neighbors using collectNeighborIds()(which returns a VertexRDD, suitable for outerJoinVertices()).

And collectNeighborIds() itself is a powerful function that didn't get covered in my book. It's a convenient way to, for each vertex, gather the vertex Ids of all the neighbor vertices.

Finally, to compute the symmetry difference we use Scala Set operations, as the symmetry difference is defined as:

A Δ B = (A ∪ B) - (A ∩ B)

Note in Scala the set difference operator is &~ rather than -.

No comments: