3. Graphs

3.1. Local Spark setup

[ ]:
# Local Spark setup for the book examples.
from pathlib import Path
import os
from pyspark.sql import SparkSession

DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "graphs"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

builder = (
    SparkSession.builder
    .master("local[*]")
    .appName("spark-intro-graphs")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .config("spark.sql.adaptive.enabled", "false")
)
graphframes_jar = os.environ.get("SPARK_GRAPHFRAMES_JAR")
if graphframes_jar and Path(graphframes_jar).exists():
    builder = builder.config("spark.jars", graphframes_jar)
spark = builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark

3.2. Data

[1]:
from pyspark.sql.functions import lit
from graphframes import GraphFrame

v = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
], ["id", "name", "age"]) \
.withColumn("entity", lit("person"))

e = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])

g = GraphFrame(v, e)
[2]:
g.vertices.show()
+---+-------+---+------+
| id|   name|age|entity|
+---+-------+---+------+
|  a|  Alice| 34|person|
|  b|    Bob| 36|person|
|  c|Charlie| 30|person|
|  d|  David| 29|person|
|  e| Esther| 32|person|
|  f|  Fanny| 36|person|
|  g|  Gabby| 60|person|
+---+-------+---+------+

[3]:
g.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+

3.3. In-degrees

[4]:
g.inDegrees.show()
+---+--------+
| id|inDegree|
+---+--------+
|  f|       1|
|  e|       1|
|  d|       1|
|  c|       2|
|  b|       2|
|  a|       1|
+---+--------+

3.4. Group by, vertices

[5]:
g.vertices.groupBy().min('age').show()
+--------+
|min(age)|
+--------+
|      29|
+--------+

3.5. Filter, vertices

[6]:
g.vertices.filter('age > 30').show()
+---+------+---+------+
| id|  name|age|entity|
+---+------+---+------+
|  a| Alice| 34|person|
|  b|   Bob| 36|person|
|  e|Esther| 32|person|
|  f| Fanny| 36|person|
|  g| Gabby| 60|person|
+---+------+---+------+

3.6. Filter, edges

[7]:
g.edges.filter("relationship = 'follow'").show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
+---+---+------------+

[8]:
g.edges.filter("relationship = 'friend'").show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+

3.7. Motif finding

[9]:
g.find('(a)-[e1]->(b); (b)-[e2]->(a)').show()
+--------------------+--------------+--------------------+--------------+
|                   a|            e1|                   b|            e2|
+--------------------+--------------+--------------------+--------------+
|[c, Charlie, 30, ...|[c, b, follow]|[b, Bob, 36, person]|[b, c, follow]|
|[b, Bob, 36, person]|[b, c, follow]|[c, Charlie, 30, ...|[c, b, follow]|
+--------------------+--------------+--------------------+--------------+

[10]:
g.find('(a)-[e1]->(b); (b)-[e2]->(a)').filter('b.age > 30').filter('a.age >= 30').show()
+--------------------+--------------+--------------------+--------------+
|                   a|            e1|                   b|            e2|
+--------------------+--------------+--------------------+--------------+
|[c, Charlie, 30, ...|[c, b, follow]|[b, Bob, 36, person]|[b, c, follow]|
+--------------------+--------------+--------------------+--------------+

3.8. Subgraph

[11]:
s = GraphFrame(g.vertices, g.find("(a)-[e]->(b)")\
    .filter("e.relationship = 'follow'")\
    .filter("a.age < b.age")\
    .select('e.src', 'e.dst', 'e.relationship'))
[12]:
s.vertices.show()
+---+-------+---+------+
| id|   name|age|entity|
+---+-------+---+------+
|  a|  Alice| 34|person|
|  b|    Bob| 36|person|
|  c|Charlie| 30|person|
|  d|  David| 29|person|
|  e| Esther| 32|person|
|  f|  Fanny| 36|person|
|  g|  Gabby| 60|person|
+---+-------+---+------+

[13]:
s.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  e|  f|      follow|
|  c|  b|      follow|
+---+---+------------+

3.9. Algorithms

3.9.2. Connected components

[ ]:
sc.setCheckpointDir(str(OUTPUT_DIR / 'graph-checkpoints'))
g.connectedComponents(algorithm='graphx').select("id", "component").orderBy("component").show()

3.9.3. Strongly connected components

[17]:
g.stronglyConnectedComponents(maxIter=10).select("id", "component").orderBy("component").show()
+---+-------------+
| id|    component|
+---+-------------+
|  g| 146028888064|
|  f| 412316860416|
|  d| 670014898176|
|  a| 670014898176|
|  e| 670014898176|
|  b|1047972020224|
|  c|1047972020224|
+---+-------------+

3.9.4. Label propagation algorithm

[18]:
g.labelPropagation(maxIter=5).select("id", "label").orderBy("label").show()
+---+-------------+
| id|        label|
+---+-------------+
|  g| 146028888064|
|  e| 412316860416|
|  a| 670014898176|
|  d| 670014898176|
|  f| 670014898176|
|  b|1047972020224|
|  c|1382979469312|
+---+-------------+

3.9.5. PageRank

[19]:
r = g.pageRank(resetProbability=0.15, tol=0.01)
[20]:
r.vertices.show()
+---+-------+---+------+-------------------+
| id|   name|age|entity|           pagerank|
+---+-------+---+------+-------------------+
|  g|  Gabby| 60|person| 0.1799821386239711|
|  b|    Bob| 36|person|  2.655507832863289|
|  e| Esther| 32|person|0.37085233187676075|
|  a|  Alice| 34|person|0.44910633706538744|
|  f|  Fanny| 36|person| 0.3283606792049851|
|  d|  David| 29|person| 0.3283606792049851|
|  c|Charlie| 30|person| 2.6878300011606218|
+---+-------+---+------+-------------------+

[21]:
r.edges.show()
+---+---+------------+------+
|src|dst|relationship|weight|
+---+---+------------+------+
|  a|  b|      friend|   0.5|
|  b|  c|      follow|   1.0|
|  e|  f|      follow|   0.5|
|  e|  d|      friend|   0.5|
|  c|  b|      follow|   1.0|
|  a|  e|      friend|   0.5|
|  f|  c|      follow|   1.0|
|  d|  a|      friend|   1.0|
+---+---+------------+------+

3.9.6. Shortest path

[22]:
g.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+------+----------------+
| id|   name|age|entity|       distances|
+---+-------+---+------+----------------+
|  g|  Gabby| 60|person|              []|
|  b|    Bob| 36|person|              []|
|  e| Esther| 32|person|[d -> 1, a -> 2]|
|  a|  Alice| 34|person|[a -> 0, d -> 2]|
|  f|  Fanny| 36|person|              []|
|  d|  David| 29|person|[d -> 0, a -> 1]|
|  c|Charlie| 30|person|              []|
+---+-------+---+------+----------------+

3.9.7. Triangle count

[23]:
g.triangleCount().show()
+-----+---+-------+---+------+
|count| id|   name|age|entity|
+-----+---+-------+---+------+
|    0|  g|  Gabby| 60|person|
|    0|  f|  Fanny| 36|person|
|    1|  e| Esther| 32|person|
|    1|  d|  David| 29|person|
|    0|  c|Charlie| 30|person|
|    0|  b|    Bob| 36|person|
|    1|  a|  Alice| 34|person|
+-----+---+-------+---+------+

3.10. Input/Output (IO)

3.10.1. Writing

[ ]:
g.vertices.write\
    .format('parquet')\
    .mode('overwrite')\
    .save(str(OUTPUT_DIR / 'graph.vertices'))

g.edges.write\
    .format('parquet')\
    .mode('overwrite')\
    .save(str(OUTPUT_DIR / 'graph.edges'))

3.10.2. Reading

[ ]:
gg = GraphFrame(
    spark.read.parquet(str(OUTPUT_DIR / 'graph.vertices')),
    spark.read.parquet(str(OUTPUT_DIR / 'graph.edges')))

[26]:
gg.vertices.show()
+---+-------+---+------+
| id|   name|age|entity|
+---+-------+---+------+
|  d|  David| 29|person|
|  e| Esther| 32|person|
|  f|  Fanny| 36|person|
|  g|  Gabby| 60|person|
|  a|  Alice| 34|person|
|  b|    Bob| 36|person|
|  c|Charlie| 30|person|
+---+-------+---+------+

[27]:
gg.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+

3.11. Message passing

[28]:
from pyspark.sql.functions import sum as sqlsum
from graphframes.lib import AggregateMessages as AM

msgToSrc = AM.dst["age"]
msgToDst = AM.src["age"]
agg = g.aggregateMessages(
    sqlsum(AM.msg).alias("summedAges"),
    sendToSrc=msgToSrc,
    sendToDst=msgToDst)
agg.show()
+---+----------+
| id|summedAges|
+---+----------+
|  f|        62|
|  e|        99|
|  d|        66|
|  c|       108|
|  b|        94|
|  a|        97|
+---+----------+