6. Graphs

6.1. Data

[1]:
from pyspark.sql.functions import lit
from graphframes import GraphFrame

v = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
], ["id", "name", "age"]) \
.withColumn("entity", lit("person"))

e = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])

g = GraphFrame(v, e)
[2]:
g.vertices.show()
+---+-------+---+------+
| id|   name|age|entity|
+---+-------+---+------+
|  a|  Alice| 34|person|
|  b|    Bob| 36|person|
|  c|Charlie| 30|person|
|  d|  David| 29|person|
|  e| Esther| 32|person|
|  f|  Fanny| 36|person|
|  g|  Gabby| 60|person|
+---+-------+---+------+

[3]:
g.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+

6.2. In-degrees

[4]:
g.inDegrees.show()
+---+--------+
| id|inDegree|
+---+--------+
|  f|       1|
|  e|       1|
|  d|       1|
|  c|       2|
|  b|       2|
|  a|       1|
+---+--------+

6.3. Group by, vertices

[5]:
g.vertices.groupBy().min('age').show()
+--------+
|min(age)|
+--------+
|      29|
+--------+

6.4. Filter, vertices

[6]:
g.vertices.filter('age > 30').show()
+---+------+---+------+
| id|  name|age|entity|
+---+------+---+------+
|  a| Alice| 34|person|
|  b|   Bob| 36|person|
|  e|Esther| 32|person|
|  f| Fanny| 36|person|
|  g| Gabby| 60|person|
+---+------+---+------+

6.5. Filter, edges

[7]:
g.edges.filter("relationship = 'follow'").show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
+---+---+------------+

[8]:
g.edges.filter("relationship = 'friend'").show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+

6.6. Motif finding

[9]:
g.find('(a)-[e1]->(b); (b)-[e2]->(a)').show()
+--------------------+--------------+--------------------+--------------+
|                   a|            e1|                   b|            e2|
+--------------------+--------------+--------------------+--------------+
|[c, Charlie, 30, ...|[c, b, follow]|[b, Bob, 36, person]|[b, c, follow]|
|[b, Bob, 36, person]|[b, c, follow]|[c, Charlie, 30, ...|[c, b, follow]|
+--------------------+--------------+--------------------+--------------+

[10]:
g.find('(a)-[e1]->(b); (b)-[e2]->(a)').filter('b.age > 30').filter('a.age >= 30').show()
+--------------------+--------------+--------------------+--------------+
|                   a|            e1|                   b|            e2|
+--------------------+--------------+--------------------+--------------+
|[c, Charlie, 30, ...|[c, b, follow]|[b, Bob, 36, person]|[b, c, follow]|
+--------------------+--------------+--------------------+--------------+

6.7. Subgraph

[11]:
s = GraphFrame(g.vertices, g.find("(a)-[e]->(b)")\
    .filter("e.relationship = 'follow'")\
    .filter("a.age < b.age")\
    .select('e.src', 'e.dst', 'e.relationship'))
[12]:
s.vertices.show()
+---+-------+---+------+
| id|   name|age|entity|
+---+-------+---+------+
|  a|  Alice| 34|person|
|  b|    Bob| 36|person|
|  c|Charlie| 30|person|
|  d|  David| 29|person|
|  e| Esther| 32|person|
|  f|  Fanny| 36|person|
|  g|  Gabby| 60|person|
+---+-------+---+------+

[13]:
s.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  e|  f|      follow|
|  c|  b|      follow|
+---+---+------------+

6.8. Algorithms

6.8.2. Connected components

[16]:
sc.setCheckpointDir('/tmp')
g.connectedComponents().select("id", "component").orderBy("component").show()
+---+------------+
| id|   component|
+---+------------+
|  g|146028888064|
|  a|412316860416|
|  b|412316860416|
|  c|412316860416|
|  d|412316860416|
|  f|412316860416|
|  e|412316860416|
+---+------------+

6.8.3. Strongly connected components

[17]:
g.stronglyConnectedComponents(maxIter=10).select("id", "component").orderBy("component").show()
+---+-------------+
| id|    component|
+---+-------------+
|  g| 146028888064|
|  f| 412316860416|
|  d| 670014898176|
|  a| 670014898176|
|  e| 670014898176|
|  b|1047972020224|
|  c|1047972020224|
+---+-------------+

6.8.4. Label propagation algorithm

[18]:
g.labelPropagation(maxIter=5).select("id", "label").orderBy("label").show()
+---+-------------+
| id|        label|
+---+-------------+
|  g| 146028888064|
|  e| 412316860416|
|  a| 670014898176|
|  d| 670014898176|
|  f| 670014898176|
|  b|1047972020224|
|  c|1382979469312|
+---+-------------+

6.8.5. PageRank

[19]:
r = g.pageRank(resetProbability=0.15, tol=0.01)
[20]:
r.vertices.show()
+---+-------+---+------+-------------------+
| id|   name|age|entity|           pagerank|
+---+-------+---+------+-------------------+
|  g|  Gabby| 60|person| 0.1799821386239711|
|  b|    Bob| 36|person|  2.655507832863289|
|  e| Esther| 32|person|0.37085233187676075|
|  a|  Alice| 34|person|0.44910633706538744|
|  f|  Fanny| 36|person| 0.3283606792049851|
|  d|  David| 29|person| 0.3283606792049851|
|  c|Charlie| 30|person| 2.6878300011606218|
+---+-------+---+------+-------------------+

[21]:
r.edges.show()
+---+---+------------+------+
|src|dst|relationship|weight|
+---+---+------------+------+
|  a|  b|      friend|   0.5|
|  b|  c|      follow|   1.0|
|  e|  f|      follow|   0.5|
|  e|  d|      friend|   0.5|
|  c|  b|      follow|   1.0|
|  a|  e|      friend|   0.5|
|  f|  c|      follow|   1.0|
|  d|  a|      friend|   1.0|
+---+---+------------+------+

6.8.6. Shortest path

[22]:
g.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+------+----------------+
| id|   name|age|entity|       distances|
+---+-------+---+------+----------------+
|  g|  Gabby| 60|person|              []|
|  b|    Bob| 36|person|              []|
|  e| Esther| 32|person|[d -> 1, a -> 2]|
|  a|  Alice| 34|person|[a -> 0, d -> 2]|
|  f|  Fanny| 36|person|              []|
|  d|  David| 29|person|[d -> 0, a -> 1]|
|  c|Charlie| 30|person|              []|
+---+-------+---+------+----------------+

6.8.7. Triangle count

[23]:
g.triangleCount().show()
+-----+---+-------+---+------+
|count| id|   name|age|entity|
+-----+---+-------+---+------+
|    0|  g|  Gabby| 60|person|
|    0|  f|  Fanny| 36|person|
|    1|  e| Esther| 32|person|
|    1|  d|  David| 29|person|
|    0|  c|Charlie| 30|person|
|    0|  b|    Bob| 36|person|
|    1|  a|  Alice| 34|person|
+-----+---+-------+---+------+

6.9. Input/Output (IO)

6.9.1. Writing

[24]:
g.vertices.write\
    .format('parquet')\
    .mode('overwrite')\
    .save('/user/root/graph.vertices')

g.edges.write\
    .format('parquet')\
    .mode('overwrite')\
    .save('/user/root/graph.edges')

6.9.2. Reading

[25]:
gg = GraphFrame(
    spark.read.parquet('/user/root/graph.vertices'),
    spark.read.parquet('/user/root/graph.edges'))
[26]:
gg.vertices.show()
+---+-------+---+------+
| id|   name|age|entity|
+---+-------+---+------+
|  d|  David| 29|person|
|  e| Esther| 32|person|
|  f|  Fanny| 36|person|
|  g|  Gabby| 60|person|
|  a|  Alice| 34|person|
|  b|    Bob| 36|person|
|  c|Charlie| 30|person|
+---+-------+---+------+

[27]:
gg.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+

6.10. Message passing

[28]:
from pyspark.sql.functions import sum as sqlsum
from graphframes.lib import AggregateMessages as AM

msgToSrc = AM.dst["age"]
msgToDst = AM.src["age"]
agg = g.aggregateMessages(
    sqlsum(AM.msg).alias("summedAges"),
    sendToSrc=msgToSrc,
    sendToDst=msgToDst)
agg.show()
+---+----------+
| id|summedAges|
+---+----------+
|  f|        62|
|  e|        99|
|  d|        66|
|  c|       108|
|  b|        94|
|  a|        97|
+---+----------+