4. Execution Plans
Spark code is lazy. Most DataFrame operations build a plan, and Spark does not run that plan until an action asks for a result. Learning to read execution plans is one of the most useful Spark habits because plans explain where Spark will scan data, filter rows, shuffle records, broadcast small tables, and reuse cached results.
This chapter uses small local DataFrames so the plan is easier to read. The same ideas apply when the data is much larger.
[1]:
from pyspark.sql import SparkSession, functions as F
spark = (
SparkSession.builder
.master("local[*]")
.appName("spark-intro-execution-plans")
.config("spark.driver.host", "127.0.0.1")
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.default.parallelism", "4")
.config("spark.sql.adaptive.enabled", "false")
.config("spark.sql.autoBroadcastJoinThreshold", "-1")
.config("spark.ui.showConsoleProgress", "false")
.getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")
print("Spark version:", spark.version)
print("Spark master:", sc.master)
Spark version: 3.5.8
Spark master: local[*]
[2]:
users = spark.createDataFrame(
[
(1, "Ada", "north"),
(2, "Ben", "south"),
(3, "Cid", "north"),
(4, "Dia", "west"),
],
["user_id", "name", "region"],
)
orders = spark.createDataFrame(
[
(101, 1, 25.50, "book"),
(102, 1, 80.00, "course"),
(103, 2, 19.99, "book"),
(104, 3, 120.00, "bundle"),
(105, 3, 12.75, "book"),
(106, 4, 42.00, "course"),
],
["order_id", "user_id", "amount", "category"],
)
orders.show()
+--------+-------+------+--------+
|order_id|user_id|amount|category|
+--------+-------+------+--------+
| 101| 1| 25.5| book|
| 102| 1| 80.0| course|
| 103| 2| 19.99| book|
| 104| 3| 120.0| bundle|
| 105| 3| 12.75| book|
| 106| 4| 42.0| course|
+--------+-------+------+--------+
4.1. Lazy Evaluation
The next cell creates a new DataFrame named large_orders. Spark does not scan rows when this variable is assigned. It records a logical transformation: filter the orders, then select a few columns. The action is show(), which is where Spark finally runs a job.
[3]:
large_orders = (
orders
.filter(F.col("amount") >= 50)
.select("order_id", "user_id", "amount")
)
print(type(large_orders).__name__)
large_orders.show()
DataFrame
+--------+-------+------+
|order_id|user_id|amount|
+--------+-------+------+
| 102| 1| 80.0|
| 104| 3| 120.0|
+--------+-------+------+
4.2. explain()
explain("formatted") prints the physical plan in sections. For a simple filter and projection, the plan is mostly local to each partition. There is no need to move records across partitions because each row can be evaluated independently.
[4]:
large_orders.explain("formatted")
== Physical Plan ==
* Project (3)
+- * Filter (2)
+- * Scan ExistingRDD (1)
(1) Scan ExistingRDD [codegen id : 1]
Output [4]: [order_id#6L, user_id#7L, amount#8, category#9]
Arguments: [order_id#6L, user_id#7L, amount#8, category#9], MapPartitionsRDD[9] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Filter [codegen id : 1]
Input [4]: [order_id#6L, user_id#7L, amount#8, category#9]
Condition : (isnotnull(amount#8) AND (amount#8 >= 50.0))
(3) Project [codegen id : 1]
Output [3]: [order_id#6L, user_id#7L, amount#8]
Input [4]: [order_id#6L, user_id#7L, amount#8, category#9]
4.3. Finding Shuffles
A shuffle is a data movement step. Grouping by a key, joining large tables, sorting globally, and changing partitioning can all require Spark to move records between executors. In an execution plan, a shuffle often appears as an Exchange.
The aggregation below groups orders by user_id. Spark must bring records with the same user id together before it can finish the total.
[5]:
spend_by_user = (
orders
.groupBy("user_id")
.agg(
F.count("*").alias("orders"),
F.round(F.sum("amount"), 2).alias("total_amount"),
)
)
spend_by_user.explain("formatted")
spend_by_user.orderBy("user_id").show()
== Physical Plan ==
* HashAggregate (5)
+- Exchange (4)
+- * HashAggregate (3)
+- * Project (2)
+- * Scan ExistingRDD (1)
(1) Scan ExistingRDD [codegen id : 1]
Output [4]: [order_id#6L, user_id#7L, amount#8, category#9]
Arguments: [order_id#6L, user_id#7L, amount#8, category#9], MapPartitionsRDD[9] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Project [codegen id : 1]
Output [2]: [user_id#7L, amount#8]
Input [4]: [order_id#6L, user_id#7L, amount#8, category#9]
(3) HashAggregate [codegen id : 1]
Input [2]: [user_id#7L, amount#8]
Keys [1]: [user_id#7L]
Functions [2]: [partial_count(1), partial_sum(amount#8)]
Aggregate Attributes [2]: [count#58L, sum#59]
Results [3]: [user_id#7L, count#60L, sum#61]
(4) Exchange
Input [3]: [user_id#7L, count#60L, sum#61]
Arguments: hashpartitioning(user_id#7L, 4), ENSURE_REQUIREMENTS, [plan_id=57]
(5) HashAggregate [codegen id : 2]
Input [3]: [user_id#7L, count#60L, sum#61]
Keys [1]: [user_id#7L]
Functions [2]: [count(1), sum(amount#8)]
Aggregate Attributes [2]: [count(1)#51L, sum(amount#8)#53]
Results [3]: [user_id#7L, count(1)#51L AS orders#52L, round(sum(amount#8)#53, 2) AS total_amount#54]
+-------+------+------------+
|user_id|orders|total_amount|
+-------+------+------------+
| 1| 2| 105.5|
| 2| 1| 19.99|
| 3| 2| 132.75|
| 4| 1| 42.0|
+-------+------+------------+
4.4. Shuffle Joins And Broadcast Joins
When Spark joins two DataFrames, it needs rows with matching keys to meet. With automatic broadcast joins disabled in this session, the first plan uses a shuffle join. The second plan uses broadcast(users) to tell Spark that the user table is small enough to send to each worker.
Broadcasting a small dimension table is often faster than shuffling both sides of a join, but broadcasting a table that is too large can create memory pressure.
[6]:
shuffle_join = orders.join(users, on="user_id", how="inner")
print("Shuffle join")
shuffle_join.explain("formatted")
broadcast_join = orders.join(F.broadcast(users), on="user_id", how="inner")
print("Broadcast join")
broadcast_join.explain("formatted")
broadcast_join.orderBy("order_id").show()
Shuffle join
== Physical Plan ==
* Project (10)
+- * SortMergeJoin Inner (9)
:- * Sort (4)
: +- Exchange (3)
: +- * Filter (2)
: +- * Scan ExistingRDD (1)
+- * Sort (8)
+- Exchange (7)
+- * Filter (6)
+- * Scan ExistingRDD (5)
(1) Scan ExistingRDD [codegen id : 1]
Output [4]: [order_id#6L, user_id#7L, amount#8, category#9]
Arguments: [order_id#6L, user_id#7L, amount#8, category#9], MapPartitionsRDD[9] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Filter [codegen id : 1]
Input [4]: [order_id#6L, user_id#7L, amount#8, category#9]
Condition : isnotnull(user_id#7L)
(3) Exchange
Input [4]: [order_id#6L, user_id#7L, amount#8, category#9]
Arguments: hashpartitioning(user_id#7L, 4), ENSURE_REQUIREMENTS, [plan_id=134]
(4) Sort [codegen id : 2]
Input [4]: [order_id#6L, user_id#7L, amount#8, category#9]
Arguments: [user_id#7L ASC NULLS FIRST], false, 0
(5) Scan ExistingRDD [codegen id : 3]
Output [3]: [user_id#0L, name#1, region#2]
Arguments: [user_id#0L, name#1, region#2], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(6) Filter [codegen id : 3]
Input [3]: [user_id#0L, name#1, region#2]
Condition : isnotnull(user_id#0L)
(7) Exchange
Input [3]: [user_id#0L, name#1, region#2]
Arguments: hashpartitioning(user_id#0L, 4), ENSURE_REQUIREMENTS, [plan_id=140]
(8) Sort [codegen id : 4]
Input [3]: [user_id#0L, name#1, region#2]
Arguments: [user_id#0L ASC NULLS FIRST], false, 0
(9) SortMergeJoin [codegen id : 5]
Left keys [1]: [user_id#7L]
Right keys [1]: [user_id#0L]
Join type: Inner
Join condition: None
(10) Project [codegen id : 5]
Output [6]: [user_id#7L, order_id#6L, amount#8, category#9, name#1, region#2]
Input [7]: [order_id#6L, user_id#7L, amount#8, category#9, user_id#0L, name#1, region#2]
Broadcast join
== Physical Plan ==
* Project (7)
+- * BroadcastHashJoin Inner BuildRight (6)
:- * Filter (2)
: +- * Scan ExistingRDD (1)
+- BroadcastExchange (5)
+- * Filter (4)
+- * Scan ExistingRDD (3)
(1) Scan ExistingRDD [codegen id : 2]
Output [4]: [order_id#6L, user_id#7L, amount#8, category#9]
Arguments: [order_id#6L, user_id#7L, amount#8, category#9], MapPartitionsRDD[9] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Filter [codegen id : 2]
Input [4]: [order_id#6L, user_id#7L, amount#8, category#9]
Condition : isnotnull(user_id#7L)
(3) Scan ExistingRDD [codegen id : 1]
Output [3]: [user_id#0L, name#1, region#2]
Arguments: [user_id#0L, name#1, region#2], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(4) Filter [codegen id : 1]
Input [3]: [user_id#0L, name#1, region#2]
Condition : isnotnull(user_id#0L)
(5) BroadcastExchange
Input [3]: [user_id#0L, name#1, region#2]
Arguments: HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=187]
(6) BroadcastHashJoin [codegen id : 2]
Left keys [1]: [user_id#7L]
Right keys [1]: [user_id#0L]
Join type: Inner
Join condition: None
(7) Project [codegen id : 2]
Output [6]: [user_id#7L, order_id#6L, amount#8, category#9, name#1, region#2]
Input [7]: [order_id#6L, user_id#7L, amount#8, category#9, user_id#0L, name#1, region#2]
+-------+--------+------+--------+----+------+
|user_id|order_id|amount|category|name|region|
+-------+--------+------+--------+----+------+
| 1| 101| 25.5| book| Ada| north|
| 1| 102| 80.0| course| Ada| north|
| 2| 103| 19.99| book| Ben| south|
| 3| 104| 120.0| bundle| Cid| north|
| 3| 105| 12.75| book| Cid| north|
| 4| 106| 42.0| course| Dia| west|
+-------+--------+------+--------+----+------+
4.5. Caching
Caching asks Spark to keep a DataFrame after it is computed. This helps when the same expensive intermediate result will be reused. The first action materializes the cache. Later actions can read from memory instead of recomputing the whole lineage.
Caching is not free. It uses executor memory, and cached data can be evicted when memory is needed elsewhere.
[7]:
cached_spend = spend_by_user.cache()
# Materialize the cache.
cached_spend.count()
cached_spend.filter(F.col("total_amount") >= 50).explain("formatted")
cached_spend.orderBy(F.desc("total_amount")).show()
== Physical Plan ==
* Filter (8)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- * HashAggregate (7)
+- Exchange (6)
+- * HashAggregate (5)
+- * Project (4)
+- * Scan ExistingRDD (3)
(1) InMemoryTableScan
Output [3]: [user_id#7L, orders#52L, total_amount#54]
Arguments: [user_id#7L, orders#52L, total_amount#54], [isnotnull(total_amount#54), (total_amount#54 >= 50.0)]
(2) InMemoryRelation
Arguments: [user_id#7L, orders#52L, total_amount#54], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@2cf95efa,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) HashAggregate(keys=[user_id#7L], functions=[count(1), sum(amount#8)], output=[user_id#7L, orders#52L, total_amount#54])
+- Exchange hashpartitioning(user_id#7L, 4), ENSURE_REQUIREMENTS, [plan_id=256]
+- *(1) HashAggregate(keys=[user_id#7L], functions=[partial_count(1), partial_sum(amount#8)], output=[user_id#7L, count#60L, sum#61])
+- *(1) Project [user_id#7L, amount#8]
+- *(1) Scan ExistingRDD[order_id#6L,user_id#7L,amount#8,category#9]
,None)
(3) Scan ExistingRDD [codegen id : 1]
Output [4]: [order_id#6L, user_id#7L, amount#8, category#9]
Arguments: [order_id#6L, user_id#7L, amount#8, category#9], MapPartitionsRDD[9] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(4) Project [codegen id : 1]
Output [2]: [user_id#7L, amount#8]
Input [4]: [order_id#6L, user_id#7L, amount#8, category#9]
(5) HashAggregate [codegen id : 1]
Input [2]: [user_id#7L, amount#8]
Keys [1]: [user_id#7L]
Functions [2]: [partial_count(1), partial_sum(amount#8)]
Aggregate Attributes [2]: [count#58L, sum#59]
Results [3]: [user_id#7L, count#60L, sum#61]
(6) Exchange
Input [3]: [user_id#7L, count#60L, sum#61]
Arguments: hashpartitioning(user_id#7L, 4), ENSURE_REQUIREMENTS, [plan_id=256]
(7) HashAggregate [codegen id : 2]
Input [3]: [user_id#7L, count#60L, sum#61]
Keys [1]: [user_id#7L]
Functions [2]: [count(1), sum(amount#8)]
Aggregate Attributes [2]: [count(1)#51L, sum(amount#8)#53]
Results [3]: [user_id#7L, count(1)#51L AS orders#52L, round(sum(amount#8)#53, 2) AS total_amount#54]
(8) Filter [codegen id : 1]
Input [3]: [user_id#7L, orders#52L, total_amount#54]
Condition : (isnotnull(total_amount#54) AND (total_amount#54 >= 50.0))
+-------+------+------------+
|user_id|orders|total_amount|
+-------+------+------------+
| 3| 2| 132.75|
| 1| 2| 105.5|
| 4| 1| 42.0|
| 2| 1| 19.99|
+-------+------+------------+
4.6. Partitions
Partitions are the units of parallel work. On a laptop, local[*] uses local CPU cores. In a cluster, partitions are spread across executors. Too few partitions can underuse the available compute. Too many tiny partitions can spend more time scheduling tasks than processing data.
[8]:
partition_report = [
("orders", orders.rdd.getNumPartitions()),
("orders.repartition(4, 'user_id')", orders.repartition(4, "user_id").rdd.getNumPartitions()),
("orders.coalesce(1)", orders.coalesce(1).rdd.getNumPartitions()),
]
spark.createDataFrame(partition_report, ["dataset", "partitions"]).show(truncate=False)
+--------------------------------+----------+
|dataset |partitions|
+--------------------------------+----------+
|orders |4 |
|orders.repartition(4, 'user_id')|4 |
|orders.coalesce(1) |1 |
+--------------------------------+----------+
4.7. Spark UI
When Spark is running locally, the Spark UI is usually available from the driver process. In Docker, the URL printed below is inside the container unless ports are explicitly published. The UI is still worth knowing because it connects code to jobs, stages, tasks, shuffles, storage, and SQL plans.
[9]:
print(sc.uiWebUrl or "Spark UI is not available for this session")
http://127.0.0.1:4040
4.8. What To Remember
Use explain("formatted") when performance or behavior is unclear. Look for scans, filters, Exchange nodes, join strategies, and cache reads. The point is not to memorize every plan node. The point is to learn where Spark is moving data and which parts of your code force that movement.
[10]:
spark.stop()