3. Graphs
3.1. Local Spark setup
[ ]:
# Local Spark setup for the book examples.
from pathlib import Path
import os
from pyspark.sql import SparkSession
DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "graphs"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
builder = (
SparkSession.builder
.master("local[*]")
.appName("spark-intro-graphs")
.config("spark.driver.host", "127.0.0.1")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.default.parallelism", "4")
.config("spark.sql.adaptive.enabled", "false")
)
graphframes_jar = os.environ.get("SPARK_GRAPHFRAMES_JAR")
if graphframes_jar and Path(graphframes_jar).exists():
builder = builder.config("spark.jars", graphframes_jar)
spark = builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark
3.2. Data
[1]:
from pyspark.sql.functions import lit
from graphframes import GraphFrame
v = sqlContext.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
], ["id", "name", "age"]) \
.withColumn("entity", lit("person"))
e = sqlContext.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
], ["src", "dst", "relationship"])
g = GraphFrame(v, e)
[2]:
g.vertices.show()
+---+-------+---+------+
| id| name|age|entity|
+---+-------+---+------+
| a| Alice| 34|person|
| b| Bob| 36|person|
| c|Charlie| 30|person|
| d| David| 29|person|
| e| Esther| 32|person|
| f| Fanny| 36|person|
| g| Gabby| 60|person|
+---+-------+---+------+
[3]:
g.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| b| friend|
| b| c| follow|
| c| b| follow|
| f| c| follow|
| e| f| follow|
| e| d| friend|
| d| a| friend|
| a| e| friend|
+---+---+------------+
3.3. In-degrees
[4]:
g.inDegrees.show()
+---+--------+
| id|inDegree|
+---+--------+
| f| 1|
| e| 1|
| d| 1|
| c| 2|
| b| 2|
| a| 1|
+---+--------+
3.4. Group by, vertices
[5]:
g.vertices.groupBy().min('age').show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
3.5. Filter, vertices
[6]:
g.vertices.filter('age > 30').show()
+---+------+---+------+
| id| name|age|entity|
+---+------+---+------+
| a| Alice| 34|person|
| b| Bob| 36|person|
| e|Esther| 32|person|
| f| Fanny| 36|person|
| g| Gabby| 60|person|
+---+------+---+------+
3.6. Filter, edges
[7]:
g.edges.filter("relationship = 'follow'").show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| b| c| follow|
| c| b| follow|
| f| c| follow|
| e| f| follow|
+---+---+------------+
[8]:
g.edges.filter("relationship = 'friend'").show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| b| friend|
| e| d| friend|
| d| a| friend|
| a| e| friend|
+---+---+------------+
3.7. Motif finding
[9]:
g.find('(a)-[e1]->(b); (b)-[e2]->(a)').show()
+--------------------+--------------+--------------------+--------------+
| a| e1| b| e2|
+--------------------+--------------+--------------------+--------------+
|[c, Charlie, 30, ...|[c, b, follow]|[b, Bob, 36, person]|[b, c, follow]|
|[b, Bob, 36, person]|[b, c, follow]|[c, Charlie, 30, ...|[c, b, follow]|
+--------------------+--------------+--------------------+--------------+
[10]:
g.find('(a)-[e1]->(b); (b)-[e2]->(a)').filter('b.age > 30').filter('a.age >= 30').show()
+--------------------+--------------+--------------------+--------------+
| a| e1| b| e2|
+--------------------+--------------+--------------------+--------------+
|[c, Charlie, 30, ...|[c, b, follow]|[b, Bob, 36, person]|[b, c, follow]|
+--------------------+--------------+--------------------+--------------+
3.8. Subgraph
[11]:
s = GraphFrame(g.vertices, g.find("(a)-[e]->(b)")\
.filter("e.relationship = 'follow'")\
.filter("a.age < b.age")\
.select('e.src', 'e.dst', 'e.relationship'))
[12]:
s.vertices.show()
+---+-------+---+------+
| id| name|age|entity|
+---+-------+---+------+
| a| Alice| 34|person|
| b| Bob| 36|person|
| c|Charlie| 30|person|
| d| David| 29|person|
| e| Esther| 32|person|
| f| Fanny| 36|person|
| g| Gabby| 60|person|
+---+-------+---+------+
[13]:
s.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| e| f| follow|
| c| b| follow|
+---+---+------------+
3.9. Algorithms
3.9.1. Breadth-first search
[14]:
g.bfs("name = 'Esther'", "age < 32").show()
+--------------------+--------------+--------------------+
| from| e0| to|
+--------------------+--------------+--------------------+
|[e, Esther, 32, p...|[e, d, friend]|[d, David, 29, pe...|
+--------------------+--------------+--------------------+
[15]:
g.bfs("name = 'Esther'", "age < 32", edgeFilter="relationship != 'friend'", maxPathLength=3).show()
+--------------------+--------------+--------------------+--------------+--------------------+
| from| e0| v1| e1| to|
+--------------------+--------------+--------------------+--------------+--------------------+
|[e, Esther, 32, p...|[e, f, follow]|[f, Fanny, 36, pe...|[f, c, follow]|[c, Charlie, 30, ...|
+--------------------+--------------+--------------------+--------------+--------------------+
3.9.2. Connected components
[ ]:
sc.setCheckpointDir(str(OUTPUT_DIR / 'graph-checkpoints'))
g.connectedComponents(algorithm='graphx').select("id", "component").orderBy("component").show()
3.9.3. Strongly connected components
[17]:
g.stronglyConnectedComponents(maxIter=10).select("id", "component").orderBy("component").show()
+---+-------------+
| id| component|
+---+-------------+
| g| 146028888064|
| f| 412316860416|
| d| 670014898176|
| a| 670014898176|
| e| 670014898176|
| b|1047972020224|
| c|1047972020224|
+---+-------------+
3.9.4. Label propagation algorithm
[18]:
g.labelPropagation(maxIter=5).select("id", "label").orderBy("label").show()
+---+-------------+
| id| label|
+---+-------------+
| g| 146028888064|
| e| 412316860416|
| a| 670014898176|
| d| 670014898176|
| f| 670014898176|
| b|1047972020224|
| c|1382979469312|
+---+-------------+
3.9.5. PageRank
[19]:
r = g.pageRank(resetProbability=0.15, tol=0.01)
[20]:
r.vertices.show()
+---+-------+---+------+-------------------+
| id| name|age|entity| pagerank|
+---+-------+---+------+-------------------+
| g| Gabby| 60|person| 0.1799821386239711|
| b| Bob| 36|person| 2.655507832863289|
| e| Esther| 32|person|0.37085233187676075|
| a| Alice| 34|person|0.44910633706538744|
| f| Fanny| 36|person| 0.3283606792049851|
| d| David| 29|person| 0.3283606792049851|
| c|Charlie| 30|person| 2.6878300011606218|
+---+-------+---+------+-------------------+
[21]:
r.edges.show()
+---+---+------------+------+
|src|dst|relationship|weight|
+---+---+------------+------+
| a| b| friend| 0.5|
| b| c| follow| 1.0|
| e| f| follow| 0.5|
| e| d| friend| 0.5|
| c| b| follow| 1.0|
| a| e| friend| 0.5|
| f| c| follow| 1.0|
| d| a| friend| 1.0|
+---+---+------------+------+
3.9.6. Shortest path
[22]:
g.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+------+----------------+
| id| name|age|entity| distances|
+---+-------+---+------+----------------+
| g| Gabby| 60|person| []|
| b| Bob| 36|person| []|
| e| Esther| 32|person|[d -> 1, a -> 2]|
| a| Alice| 34|person|[a -> 0, d -> 2]|
| f| Fanny| 36|person| []|
| d| David| 29|person|[d -> 0, a -> 1]|
| c|Charlie| 30|person| []|
+---+-------+---+------+----------------+
3.9.7. Triangle count
[23]:
g.triangleCount().show()
+-----+---+-------+---+------+
|count| id| name|age|entity|
+-----+---+-------+---+------+
| 0| g| Gabby| 60|person|
| 0| f| Fanny| 36|person|
| 1| e| Esther| 32|person|
| 1| d| David| 29|person|
| 0| c|Charlie| 30|person|
| 0| b| Bob| 36|person|
| 1| a| Alice| 34|person|
+-----+---+-------+---+------+
3.10. Input/Output (IO)
3.10.1. Writing
[ ]:
g.vertices.write\
.format('parquet')\
.mode('overwrite')\
.save(str(OUTPUT_DIR / 'graph.vertices'))
g.edges.write\
.format('parquet')\
.mode('overwrite')\
.save(str(OUTPUT_DIR / 'graph.edges'))
3.10.2. Reading
[ ]:
gg = GraphFrame(
spark.read.parquet(str(OUTPUT_DIR / 'graph.vertices')),
spark.read.parquet(str(OUTPUT_DIR / 'graph.edges')))
[26]:
gg.vertices.show()
+---+-------+---+------+
| id| name|age|entity|
+---+-------+---+------+
| d| David| 29|person|
| e| Esther| 32|person|
| f| Fanny| 36|person|
| g| Gabby| 60|person|
| a| Alice| 34|person|
| b| Bob| 36|person|
| c|Charlie| 30|person|
+---+-------+---+------+
[27]:
gg.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| b| friend|
| b| c| follow|
| c| b| follow|
| f| c| follow|
| e| f| follow|
| e| d| friend|
| d| a| friend|
| a| e| friend|
+---+---+------------+
3.11. Message passing
[28]:
from pyspark.sql.functions import sum as sqlsum
from graphframes.lib import AggregateMessages as AM
msgToSrc = AM.dst["age"]
msgToDst = AM.src["age"]
agg = g.aggregateMessages(
sqlsum(AM.msg).alias("summedAges"),
sendToSrc=msgToSrc,
sendToDst=msgToDst)
agg.show()
+---+----------+
| id|summedAges|
+---+----------+
| f| 62|
| e| 99|
| d| 66|
| c| 108|
| b| 94|
| a| 97|
+---+----------+