5. Pandas API On Spark

The pandas API on Spark gives pandas users a familiar DataFrame interface backed by Spark execution. It is useful when code reads naturally in pandas style but the data is too large for a single machine. It is still Spark underneath, so partitioning, shuffles, execution plans, and collection to the driver still matter.

Apache Spark’s pandas API on Spark docs emphasize best practices such as using PySpark APIs when they are clearer, checking execution plans, avoiding unnecessary shuffles, and choosing an index strategy intentionally.

[1]:
from pathlib import Path
import shutil

from pyspark.sql import SparkSession, functions as F

DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "pandas-api-on-spark"

if OUTPUT_DIR.exists():
    shutil.rmtree(OUTPUT_DIR)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("spark-intro-pandas-api-on-spark")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .config("spark.sql.adaptive.enabled", "false")
    .config("spark.ui.showConsoleProgress", "false")
    .getOrCreate()
)

sc = spark.sparkContext
sc.setLogLevel("ERROR")

print("Spark version:", spark.version)
print("Spark master:", sc.master)

Spark version: 3.5.8
Spark master: local[*]
[2]:
import pandas as pd
import pyspark.pandas as ps

ps.set_option("compute.default_index_type", "distributed-sequence")
print("pandas API on Spark imported")

pandas API on Spark imported

5.1. A Pandas-Like DataFrame

The API looks familiar: create a DataFrame, select columns, assign new columns, and group by values. The result is not a local pandas DataFrame until you explicitly collect it.

[3]:
psdf = ps.DataFrame({
    "customer_id": [1, 2, 3, 4, 5],
    "region": ["north", "south", "north", "west", "south"],
    "amount": [10.0, 25.5, 7.5, 40.0, 13.0],
})

psdf["large_order"] = (psdf["amount"] >= 20).astype("int64")
summary = psdf.groupby("region").agg({"amount": "sum", "large_order": "sum"}).sort_index()
summary

[3]:
amount large_order
region
north 17.5 0
south 38.5 1
west 40.0 1

5.2. Moving Between APIs

You can convert from pandas API on Spark to a Spark DataFrame when Spark SQL functions make the next step clearer. You can also expose a Spark DataFrame through the pandas API on Spark when pandas-style operations are easier to read.

[4]:
spark_df = psdf.to_spark(index_col="row_id")
spark_df.show()

round_trip = spark_df.pandas_api(index_col="row_id")
round_trip.head(3)

+------+-----------+------+------+-----------+
|row_id|customer_id|region|amount|large_order|
+------+-----------+------+------+-----------+
|     0|          1| north|  10.0|          0|
|     1|          2| south|  25.5|          1|
|     2|          3| north|   7.5|          0|
|     3|          4|  west|  40.0|          1|
|     4|          5| south|  13.0|          0|
+------+-----------+------+------+-----------+

[4]:
customer_id region amount large_order
row_id
0 1 north 10.0 0
1 2 south 25.5 1
2 3 north 7.5 0

5.3. Plans Still Matter

Pandas-looking code can still trigger Spark shuffles. The .spark.explain() accessor is a useful reminder that this API is not local pandas.

[5]:
summary.spark.explain()

== Physical Plan ==
*(3) Project [__index_level_0__#21, amount#22, large_order#23L]
+- *(3) Sort [__index_level_0__#21 ASC NULLS LAST, __natural_order__#53L ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(__index_level_0__#21 ASC NULLS LAST, __natural_order__#53L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [plan_id=101]
      +- *(2) HashAggregate(keys=[__index_level_0__#21], functions=[sum(amount#3), sum(large_order#20L)])
         +- Exchange hashpartitioning(__index_level_0__#21, 4), ENSURE_REQUIREMENTS, [plan_id=97]
            +- *(1) HashAggregate(keys=[__index_level_0__#21], functions=[partial_sum(amount#3), partial_sum(large_order#20L)])
               +- *(1) Project [region#2 AS __index_level_0__#21, amount#3, cast((amount#3 >= 20.0) as bigint) AS large_order#20L]
                  +- *(1) Filter atleastnnonnulls(1, region#2)
                     +- *(1) Scan ExistingRDD[__index_level_0__#0L,customer_id#1L,region#2,amount#3]


5.4. Collect Intentionally

Calling to_pandas() brings data to the driver. That is fine for small summaries. It is not fine for large tables.

[6]:
small_summary = summary.to_pandas()
small_summary

[6]:
amount large_order
region
north 17.5 0
south 38.5 1
west 40.0 1

5.5. What To Remember

The pandas API on Spark is a bridge for pandas-fluent users and pandas-shaped code. It is not a way to ignore Spark’s execution model. Use it where it improves readability, and switch back to PySpark DataFrames when plans, joins, schemas, or production behavior need more explicit control.

[7]:
spark.stop()