4. UDFs Versus Built-In Functions

Python makes it easy to write row-by-row functions. Spark can run those functions as UDFs, but UDFs often force Spark to leave the optimized SQL engine and execute Python code separately. Built-in Spark SQL functions are usually faster, easier for Spark to optimize, and easier to reason about from the physical plan.

The rule of thumb is simple: use built-in functions first, pandas UDFs when vectorized Python is genuinely needed, and regular Python UDFs as the fallback.

[1]:
from pathlib import Path
import shutil

from pyspark.sql import SparkSession, functions as F

DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "udfs-builtins"

if OUTPUT_DIR.exists():
    shutil.rmtree(OUTPUT_DIR)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("spark-intro-udfs-builtins")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .config("spark.sql.adaptive.enabled", "false")
    .config("spark.ui.showConsoleProgress", "false")
    .getOrCreate()
)

sc = spark.sparkContext
sc.setLogLevel("ERROR")

print("Spark version:", spark.version)
print("Spark master:", sc.master)

Spark version: 3.5.8
Spark master: local[*]
[2]:
events = spark.createDataFrame(
    [
        (1, " ADA@example.COM ", "book-123", 25.50),
        (2, "ben@example.com", "course-9", 80.00),
        (3, None, "book-777", 12.00),
        (4, "cid@example.com", "unknown", None),
    ],
    ["user_id", "email", "sku", "amount"],
)

events.show(truncate=False)

+-------+-----------------+--------+------+
|user_id|email            |sku     |amount|
+-------+-----------------+--------+------+
|1      | ADA@example.COM |book-123|25.5  |
|2      |ben@example.com  |course-9|80.0  |
|3      |NULL             |book-777|12.0  |
|4      |cid@example.com  |unknown |NULL  |
+-------+-----------------+--------+------+

4.1. Built-In Functions

Built-in functions keep work inside Spark’s expression engine. Spark can push, combine, reorder, and code-generate many of these expressions. In the plan, these expressions stay inside a normal project rather than a Python evaluation node.

[3]:
clean = events.select(
    "user_id",
    F.lower(F.trim("email")).alias("email"),
    F.regexp_extract("sku", r"^([a-z]+)-", 1).alias("sku_family"),
    F.coalesce("amount", F.lit(0.0)).alias("amount"),
    F.when(F.col("amount") >= 50, "large").otherwise("small_or_missing").alias("amount_band"),
)

clean.show(truncate=False)
clean.explain("formatted")

+-------+---------------+----------+------+----------------+
|user_id|email          |sku_family|amount|amount_band     |
+-------+---------------+----------+------+----------------+
|1      |ada@example.com|book      |25.5  |small_or_missing|
|2      |ben@example.com|course    |80.0  |large           |
|3      |NULL           |book      |12.0  |small_or_missing|
|4      |cid@example.com|          |0.0   |small_or_missing|
+-------+---------------+----------+------+----------------+

== Physical Plan ==
* Project (2)
+- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [4]: [user_id#0L, email#1, sku#2, amount#3]
Arguments: [user_id#0L, email#1, sku#2, amount#3], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Project [codegen id : 1]
Output [5]: [user_id#0L, lower(trim(email#1, None)) AS email#25, regexp_extract(sku#2, ^([a-z]+)-, 1) AS sku_family#26, coalesce(amount#3, 0.0) AS amount#27, CASE WHEN (amount#3 >= 50.0) THEN large ELSE small_or_missing END AS amount_band#28]
Input [4]: [user_id#0L, email#1, sku#2, amount#3]


4.2. Regular Python UDFs

A regular Python UDF is useful when no Spark expression exists for the logic. The cost is that Spark must serialize data to Python and evaluate the function outside the JVM execution engine. In the physical plan, look for BatchEvalPython.

[4]:
from pyspark.sql.types import StringType

@F.udf(StringType())
def classify_amount(amount):
    if amount is None:
        return "missing"
    return "large" if amount >= 50 else "small"

with_python_udf = events.withColumn("amount_band", classify_amount("amount"))
with_python_udf.show()
with_python_udf.explain("formatted")

+-------+-----------------+--------+------+-----------+
|user_id|            email|     sku|amount|amount_band|
+-------+-----------------+--------+------+-----------+
|      1| ADA@example.COM |book-123|  25.5|      small|
|      2|  ben@example.com|course-9|  80.0|      large|
|      3|             NULL|book-777|  12.0|      small|
|      4|  cid@example.com| unknown|  NULL|    missing|
+-------+-----------------+--------+------+-----------+

== Physical Plan ==
* Project (3)
+- BatchEvalPython (2)
   +- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [4]: [user_id#0L, email#1, sku#2, amount#3]
Arguments: [user_id#0L, email#1, sku#2, amount#3], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) BatchEvalPython
Input [4]: [user_id#0L, email#1, sku#2, amount#3]
Arguments: [classify_amount(amount#3)#55], [pythonUDF0#84]

(3) Project [codegen id : 2]
Output [5]: [user_id#0L, email#1, sku#2, amount#3, pythonUDF0#84 AS amount_band#56]
Input [5]: [user_id#0L, email#1, sku#2, amount#3, pythonUDF0#84]


4.3. Pandas UDFs

A pandas UDF receives batches as pandas Series and returns batches back to Spark. Arrow moves data between Spark and Python in a columnar format. That is usually better than a row-by-row Python UDF, but it is still Python execution and still needs careful type handling.

[5]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("long")
def cents(amount: pd.Series) -> pd.Series:
    return amount.fillna(0).mul(100).round().astype("int64")

with_pandas_udf = events.withColumn("amount_cents", cents("amount"))
with_pandas_udf.show()
with_pandas_udf.explain("formatted")

+-------+-----------------+--------+------+------------+
|user_id|            email|     sku|amount|amount_cents|
+-------+-----------------+--------+------+------------+
|      1| ADA@example.COM |book-123|  25.5|        2550|
|      2|  ben@example.com|course-9|  80.0|        8000|
|      3|             NULL|book-777|  12.0|        1200|
|      4|  cid@example.com| unknown|  NULL|           0|
+-------+-----------------+--------+------+------------+

== Physical Plan ==
* Project (3)
+- ArrowEvalPython (2)
   +- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [4]: [user_id#0L, email#1, sku#2, amount#3]
Arguments: [user_id#0L, email#1, sku#2, amount#3], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) ArrowEvalPython
Input [4]: [user_id#0L, email#1, sku#2, amount#3]
Arguments: [cents(amount#3)#85L], [pythonUDF0#114L], 200

(3) Project [codegen id : 2]
Output [5]: [user_id#0L, email#1, sku#2, amount#3, pythonUDF0#114L AS amount_cents#86L]
Input [5]: [user_id#0L, email#1, sku#2, amount#3, pythonUDF0#114L]


4.4. Keep Conditions Inside The Expression

Spark UDFs do not short-circuit the same way local Python expressions do. If a function can fail for special rows, handle the condition inside the function or, better, express the condition with Spark SQL functions before the UDF is needed.

[6]:
safe_domains = events.select(
    "user_id",
    F.when(F.col("email").isNotNull(), F.split(F.lower(F.trim("email")), "@").getItem(1))
    .otherwise(F.lit("missing"))
    .alias("email_domain"),
)

safe_domains.show()

+-------+------------+
|user_id|email_domain|
+-------+------------+
|      1| example.com|
|      2| example.com|
|      3|     missing|
|      4| example.com|
+-------+------------+

4.5. What To Remember

Built-ins are the default. They are visible to Catalyst and usually produce simpler plans. Pandas UDFs are useful for vectorized Python and libraries that expect pandas objects. Regular Python UDFs are the escape hatch, not the first tool.

[7]:
spark.stop()