3. Schemas, Bad Data, And Defensive Reads
Spark can infer schemas, but production jobs should usually declare schemas at the boundary. Explicit schemas make bad data visible earlier, make tests more stable, and avoid expensive inference passes. This chapter shows how to read imperfect local files and turn data quality assumptions into checks.
[1]:
from pathlib import Path
import shutil
from pyspark.sql import SparkSession, functions as F
DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "schemas-bad-data"
if OUTPUT_DIR.exists():
shutil.rmtree(OUTPUT_DIR)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
spark = (
SparkSession.builder
.master("local[*]")
.appName("spark-intro-schemas-bad-data")
.config("spark.driver.host", "127.0.0.1")
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.default.parallelism", "4")
.config("spark.sql.adaptive.enabled", "false")
.config("spark.ui.showConsoleProgress", "false")
.getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")
print("Spark version:", spark.version)
print("Spark master:", sc.master)
Spark version: 3.5.8
Spark master: local[*]
3.1. Permissive JSON Reads
JSON is a useful format for showing malformed records because a single bad line can be isolated. The _bad_record column below captures lines Spark cannot parse into the declared schema.
[2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
raw_path = OUTPUT_DIR / "events.jsonl"
raw_path.write_text(
"".join([
'{"event_id": 1, "customer_id": 10, "amount": 12.50, "event_time": "2026-01-01"}\n',
'{"event_id": 2, "customer_id": 11, "amount": "oops", "event_time": "2026-01-02"}\n',
'not valid json\n',
'{"event_id": 3, "customer_id": 12, "amount": 8.00, "event_time": null}\n',
]),
encoding="utf-8",
)
schema = StructType([
StructField("event_id", IntegerType(), nullable=True),
StructField("customer_id", IntegerType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("event_time", StringType(), nullable=True),
StructField("_bad_record", StringType(), nullable=True),
])
raw = (
spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_bad_record")
.schema(schema)
.json(str(raw_path))
)
raw.show(truncate=False)
+--------+-----------+------+----------+--------------------------------------------------------------------------------+
|event_id|customer_id|amount|event_time|_bad_record |
+--------+-----------+------+----------+--------------------------------------------------------------------------------+
|1 |10 |12.5 |2026-01-01|NULL |
|2 |11 |NULL |2026-01-02|{"event_id": 2, "customer_id": 11, "amount": "oops", "event_time": "2026-01-02"}|
|NULL |NULL |NULL |NULL |not valid json |
|3 |12 |8.0 |NULL |NULL |
+--------+-----------+------+----------+--------------------------------------------------------------------------------+
3.2. Separate Parsing From Validation
A row can be syntactically valid but still fail business rules. Type mismatches, missing timestamps, impossible amounts, and unknown categories are data quality issues. Keep these checks explicit.
[3]:
checked = raw.withColumn(
"quality_status",
F.when(F.col("_bad_record").isNotNull(), "malformed")
.when(F.col("event_id").isNull(), "missing_event_id")
.when(F.col("amount").isNull(), "bad_amount")
.when(F.col("event_time").isNull(), "missing_event_time")
.otherwise("ok"),
)
checked.groupBy("quality_status").count().orderBy("quality_status").show()
checked.orderBy("event_id").show(truncate=False)
+------------------+-----+
| quality_status|count|
+------------------+-----+
| malformed| 2|
|missing_event_time| 1|
| ok| 1|
+------------------+-----+
+--------+-----------+------+----------+--------------------------------------------------------------------------------+------------------+
|event_id|customer_id|amount|event_time|_bad_record |quality_status |
+--------+-----------+------+----------+--------------------------------------------------------------------------------+------------------+
|NULL |NULL |NULL |NULL |not valid json |malformed |
|1 |10 |12.5 |2026-01-01|NULL |ok |
|2 |11 |NULL |2026-01-02|{"event_id": 2, "customer_id": 11, "amount": "oops", "event_time": "2026-01-02"}|malformed |
|3 |12 |8.0 |NULL |NULL |missing_event_time|
+--------+-----------+------+----------+--------------------------------------------------------------------------------+------------------+
3.3. Fail Fast When Bad Rows Should Stop The Job
Permissive mode is useful when a pipeline is designed to quarantine bad data. For strict inputs, use FAILFAST and let the read fail. The notebook prints the pattern instead of intentionally triggering an executor stack trace during documentation builds.
[4]:
failfast_pattern = """
spark.read \
.option("mode", "FAILFAST") \
.schema(schema) \
.json(input_path) \
.count()
""".strip()
print(failfast_pattern)
spark.read .option("mode", "FAILFAST") .schema(schema) .json(input_path) .count()
3.4. Defensive CSV Reads
CSV needs the same discipline: declare the schema, decide how to handle malformed rows, and make null handling explicit before downstream transformations depend on the data.
[5]:
csv_path = OUTPUT_DIR / "customers.csv"
csv_path.write_text(
"customer_id,region,age\n"
"1,north,34\n"
"2,south,not_an_age\n"
"3,,41\n",
encoding="utf-8",
)
customer_schema = StructType([
StructField("customer_id", IntegerType(), nullable=False),
StructField("region", StringType(), nullable=True),
StructField("age", IntegerType(), nullable=True),
])
customers = spark.read.option("header", True).schema(customer_schema).csv(str(csv_path))
customers.withColumn(
"region", F.coalesce("region", F.lit("unknown"))
).withColumn(
"age_status", F.when(F.col("age").isNull(), "bad_or_missing").otherwise("ok")
).show()
+-----------+-------+----+--------------+
|customer_id| region| age| age_status|
+-----------+-------+----+--------------+
| 1| north| 34| ok|
| 2| south|NULL|bad_or_missing|
| 3|unknown| 41| ok|
+-----------+-------+----+--------------+
3.5. What To Remember
Use explicit schemas at the boundary. Choose permissive, drop-malformed, or fail-fast mode intentionally. Then add quality columns or assertions that make invalid data visible before it becomes a silent modeling or reporting problem.
[6]:
spark.stop()