6. Graphs
6.1. Data
[1]:
from pyspark.sql.functions import lit
from graphframes import GraphFrame
v = sqlContext.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
], ["id", "name", "age"]) \
.withColumn("entity", lit("person"))
e = sqlContext.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
], ["src", "dst", "relationship"])
g = GraphFrame(v, e)
[2]:
g.vertices.show()
+---+-------+---+------+
| id| name|age|entity|
+---+-------+---+------+
| a| Alice| 34|person|
| b| Bob| 36|person|
| c|Charlie| 30|person|
| d| David| 29|person|
| e| Esther| 32|person|
| f| Fanny| 36|person|
| g| Gabby| 60|person|
+---+-------+---+------+
[3]:
g.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| b| friend|
| b| c| follow|
| c| b| follow|
| f| c| follow|
| e| f| follow|
| e| d| friend|
| d| a| friend|
| a| e| friend|
+---+---+------------+
6.2. In-degrees
[4]:
g.inDegrees.show()
+---+--------+
| id|inDegree|
+---+--------+
| f| 1|
| e| 1|
| d| 1|
| c| 2|
| b| 2|
| a| 1|
+---+--------+
6.3. Group by, vertices
[5]:
g.vertices.groupBy().min('age').show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
6.4. Filter, vertices
[6]:
g.vertices.filter('age > 30').show()
+---+------+---+------+
| id| name|age|entity|
+---+------+---+------+
| a| Alice| 34|person|
| b| Bob| 36|person|
| e|Esther| 32|person|
| f| Fanny| 36|person|
| g| Gabby| 60|person|
+---+------+---+------+
6.5. Filter, edges
[7]:
g.edges.filter("relationship = 'follow'").show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| b| c| follow|
| c| b| follow|
| f| c| follow|
| e| f| follow|
+---+---+------------+
[8]:
g.edges.filter("relationship = 'friend'").show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| b| friend|
| e| d| friend|
| d| a| friend|
| a| e| friend|
+---+---+------------+
6.6. Motif finding
[9]:
g.find('(a)-[e1]->(b); (b)-[e2]->(a)').show()
+--------------------+--------------+--------------------+--------------+
| a| e1| b| e2|
+--------------------+--------------+--------------------+--------------+
|[c, Charlie, 30, ...|[c, b, follow]|[b, Bob, 36, person]|[b, c, follow]|
|[b, Bob, 36, person]|[b, c, follow]|[c, Charlie, 30, ...|[c, b, follow]|
+--------------------+--------------+--------------------+--------------+
[10]:
g.find('(a)-[e1]->(b); (b)-[e2]->(a)').filter('b.age > 30').filter('a.age >= 30').show()
+--------------------+--------------+--------------------+--------------+
| a| e1| b| e2|
+--------------------+--------------+--------------------+--------------+
|[c, Charlie, 30, ...|[c, b, follow]|[b, Bob, 36, person]|[b, c, follow]|
+--------------------+--------------+--------------------+--------------+
6.7. Subgraph
[11]:
s = GraphFrame(g.vertices, g.find("(a)-[e]->(b)")\
.filter("e.relationship = 'follow'")\
.filter("a.age < b.age")\
.select('e.src', 'e.dst', 'e.relationship'))
[12]:
s.vertices.show()
+---+-------+---+------+
| id| name|age|entity|
+---+-------+---+------+
| a| Alice| 34|person|
| b| Bob| 36|person|
| c|Charlie| 30|person|
| d| David| 29|person|
| e| Esther| 32|person|
| f| Fanny| 36|person|
| g| Gabby| 60|person|
+---+-------+---+------+
[13]:
s.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| e| f| follow|
| c| b| follow|
+---+---+------------+
6.8. Algorithms
6.8.1. Breadth-first search
[14]:
g.bfs("name = 'Esther'", "age < 32").show()
+--------------------+--------------+--------------------+
| from| e0| to|
+--------------------+--------------+--------------------+
|[e, Esther, 32, p...|[e, d, friend]|[d, David, 29, pe...|
+--------------------+--------------+--------------------+
[15]:
g.bfs("name = 'Esther'", "age < 32", edgeFilter="relationship != 'friend'", maxPathLength=3).show()
+--------------------+--------------+--------------------+--------------+--------------------+
| from| e0| v1| e1| to|
+--------------------+--------------+--------------------+--------------+--------------------+
|[e, Esther, 32, p...|[e, f, follow]|[f, Fanny, 36, pe...|[f, c, follow]|[c, Charlie, 30, ...|
+--------------------+--------------+--------------------+--------------+--------------------+
6.8.2. Connected components
[16]:
sc.setCheckpointDir('/tmp')
g.connectedComponents().select("id", "component").orderBy("component").show()
+---+------------+
| id| component|
+---+------------+
| g|146028888064|
| a|412316860416|
| b|412316860416|
| c|412316860416|
| d|412316860416|
| f|412316860416|
| e|412316860416|
+---+------------+
6.8.3. Strongly connected components
[17]:
g.stronglyConnectedComponents(maxIter=10).select("id", "component").orderBy("component").show()
+---+-------------+
| id| component|
+---+-------------+
| g| 146028888064|
| f| 412316860416|
| d| 670014898176|
| a| 670014898176|
| e| 670014898176|
| b|1047972020224|
| c|1047972020224|
+---+-------------+
6.8.4. Label propagation algorithm
[18]:
g.labelPropagation(maxIter=5).select("id", "label").orderBy("label").show()
+---+-------------+
| id| label|
+---+-------------+
| g| 146028888064|
| e| 412316860416|
| a| 670014898176|
| d| 670014898176|
| f| 670014898176|
| b|1047972020224|
| c|1382979469312|
+---+-------------+
6.8.5. PageRank
[19]:
r = g.pageRank(resetProbability=0.15, tol=0.01)
[20]:
r.vertices.show()
+---+-------+---+------+-------------------+
| id| name|age|entity| pagerank|
+---+-------+---+------+-------------------+
| g| Gabby| 60|person| 0.1799821386239711|
| b| Bob| 36|person| 2.655507832863289|
| e| Esther| 32|person|0.37085233187676075|
| a| Alice| 34|person|0.44910633706538744|
| f| Fanny| 36|person| 0.3283606792049851|
| d| David| 29|person| 0.3283606792049851|
| c|Charlie| 30|person| 2.6878300011606218|
+---+-------+---+------+-------------------+
[21]:
r.edges.show()
+---+---+------------+------+
|src|dst|relationship|weight|
+---+---+------------+------+
| a| b| friend| 0.5|
| b| c| follow| 1.0|
| e| f| follow| 0.5|
| e| d| friend| 0.5|
| c| b| follow| 1.0|
| a| e| friend| 0.5|
| f| c| follow| 1.0|
| d| a| friend| 1.0|
+---+---+------------+------+
6.8.6. Shortest path
[22]:
g.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+------+----------------+
| id| name|age|entity| distances|
+---+-------+---+------+----------------+
| g| Gabby| 60|person| []|
| b| Bob| 36|person| []|
| e| Esther| 32|person|[d -> 1, a -> 2]|
| a| Alice| 34|person|[a -> 0, d -> 2]|
| f| Fanny| 36|person| []|
| d| David| 29|person|[d -> 0, a -> 1]|
| c|Charlie| 30|person| []|
+---+-------+---+------+----------------+
6.8.7. Triangle count
[23]:
g.triangleCount().show()
+-----+---+-------+---+------+
|count| id| name|age|entity|
+-----+---+-------+---+------+
| 0| g| Gabby| 60|person|
| 0| f| Fanny| 36|person|
| 1| e| Esther| 32|person|
| 1| d| David| 29|person|
| 0| c|Charlie| 30|person|
| 0| b| Bob| 36|person|
| 1| a| Alice| 34|person|
+-----+---+-------+---+------+
6.9. Input/Output (IO)
6.9.1. Writing
[24]:
g.vertices.write\
.format('parquet')\
.mode('overwrite')\
.save('/user/root/graph.vertices')
g.edges.write\
.format('parquet')\
.mode('overwrite')\
.save('/user/root/graph.edges')
6.9.2. Reading
[25]:
gg = GraphFrame(
spark.read.parquet('/user/root/graph.vertices'),
spark.read.parquet('/user/root/graph.edges'))
[26]:
gg.vertices.show()
+---+-------+---+------+
| id| name|age|entity|
+---+-------+---+------+
| d| David| 29|person|
| e| Esther| 32|person|
| f| Fanny| 36|person|
| g| Gabby| 60|person|
| a| Alice| 34|person|
| b| Bob| 36|person|
| c|Charlie| 30|person|
+---+-------+---+------+
[27]:
gg.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| b| friend|
| b| c| follow|
| c| b| follow|
| f| c| follow|
| e| f| follow|
| e| d| friend|
| d| a| friend|
| a| e| friend|
+---+---+------------+
6.10. Message passing
[28]:
from pyspark.sql.functions import sum as sqlsum
from graphframes.lib import AggregateMessages as AM
msgToSrc = AM.dst["age"]
msgToDst = AM.src["age"]
agg = g.aggregateMessages(
sqlsum(AM.msg).alias("summedAges"),
sendToSrc=msgToSrc,
sendToDst=msgToDst)
agg.show()
+---+----------+
| id|summedAges|
+---+----------+
| f| 62|
| e| 99|
| d| 66|
| c| 108|
| b| 94|
| a| 97|
+---+----------+