1. End-To-End Local Spark Project
The earlier chapters introduce Spark one topic at a time. This chapter puts several pieces together in one small workflow. We will create local raw data, read it with schemas, clean it with DataFrames, query it with SQL, write a curated Parquet dataset, and train a tiny machine learning pipeline.
The project is intentionally small enough to run on a laptop. The structure is the important part: raw data in, typed DataFrames, reusable features, durable output, and a model built from the curated table.
[1]:
from pathlib import Path
import csv
import json
import shutil
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import (
StructType,
StructField,
IntegerType,
StringType,
DoubleType,
)
DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "end-to-end-project"
RAW_DIR = OUTPUT_DIR / "raw"
CURATED_DIR = OUTPUT_DIR / "curated" / "customer_features"
if OUTPUT_DIR.exists():
shutil.rmtree(OUTPUT_DIR)
RAW_DIR.mkdir(parents=True, exist_ok=True)
spark = (
SparkSession.builder
.master("local[*]")
.appName("spark-intro-end-to-end-project")
.config("spark.driver.host", "127.0.0.1")
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.default.parallelism", "4")
.config("spark.sql.adaptive.enabled", "false")
.config("spark.ui.showConsoleProgress", "false")
.getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")
print("Spark version:", spark.version)
print("Spark master:", sc.master)
Spark version: 3.5.8
Spark master: local[*]
1.1. Create A Small Raw Dataset
A real Spark project usually starts with files created by another system. Here we create those files locally so the notebook is self-contained. The customer table is CSV. The event table is JSON Lines, where each line is a separate JSON object.
[2]:
customers = [
{"customer_id": 1, "signup_channel": "search", "region": "north", "retained": 1},
{"customer_id": 2, "signup_channel": "social", "region": "south", "retained": 0},
{"customer_id": 3, "signup_channel": "search", "region": "west", "retained": 1},
{"customer_id": 4, "signup_channel": "email", "region": "north", "retained": 1},
{"customer_id": 5, "signup_channel": "social", "region": "east", "retained": 0},
{"customer_id": 6, "signup_channel": "email", "region": "west", "retained": 1},
{"customer_id": 7, "signup_channel": "search", "region": "south", "retained": 0},
{"customer_id": 8, "signup_channel": "social", "region": "north", "retained": 0},
{"customer_id": 9, "signup_channel": "email", "region": "east", "retained": 1},
{"customer_id": 10, "signup_channel": "search", "region": "north", "retained": 1},
{"customer_id": 11, "signup_channel": "social", "region": "west", "retained": 0},
{"customer_id": 12, "signup_channel": "email", "region": "south", "retained": 1},
]
events = [
{"customer_id": 1, "event_time": "2026-01-01 09:00:00", "event_type": "view", "amount": 0.0, "session_minutes": 8.0},
{"customer_id": 1, "event_time": "2026-01-01 09:10:00", "event_type": "purchase", "amount": 38.0, "session_minutes": 12.0},
{"customer_id": 2, "event_time": "2026-01-01 10:00:00", "event_type": "view", "amount": 0.0, "session_minutes": 3.0},
{"customer_id": 2, "event_time": "2026-01-02 10:20:00", "event_type": "support", "amount": 0.0, "session_minutes": 6.0},
{"customer_id": 3, "event_time": "2026-01-01 11:00:00", "event_type": "purchase", "amount": 95.0, "session_minutes": 15.0},
{"customer_id": 3, "event_time": "2026-01-03 11:30:00", "event_type": "purchase", "amount": 22.0, "session_minutes": 10.0},
{"customer_id": 4, "event_time": "2026-01-02 08:30:00", "event_type": "view", "amount": 0.0, "session_minutes": 5.0},
{"customer_id": 4, "event_time": "2026-01-02 08:40:00", "event_type": "purchase", "amount": 48.0, "session_minutes": 11.0},
{"customer_id": 5, "event_time": "2026-01-03 13:15:00", "event_type": "view", "amount": 0.0, "session_minutes": 4.0},
{"customer_id": 6, "event_time": "2026-01-01 15:00:00", "event_type": "purchase", "amount": 70.0, "session_minutes": 14.0},
{"customer_id": 6, "event_time": "2026-01-04 15:10:00", "event_type": "purchase", "amount": 18.0, "session_minutes": 9.0},
{"customer_id": 7, "event_time": "2026-01-02 16:00:00", "event_type": "view", "amount": 0.0, "session_minutes": 2.0},
{"customer_id": 8, "event_time": "2026-01-03 17:00:00", "event_type": "view", "amount": 0.0, "session_minutes": 4.0},
{"customer_id": 9, "event_time": "2026-01-02 09:30:00", "event_type": "purchase", "amount": 65.0, "session_minutes": 12.0},
{"customer_id": 10, "event_time": "2026-01-04 10:10:00", "event_type": "purchase", "amount": 31.0, "session_minutes": 8.0},
{"customer_id": 10, "event_time": "2026-01-05 10:10:00", "event_type": "purchase", "amount": 45.0, "session_minutes": 13.0},
{"customer_id": 11, "event_time": "2026-01-02 12:00:00", "event_type": "view", "amount": 0.0, "session_minutes": 3.0},
{"customer_id": 12, "event_time": "2026-01-05 14:20:00", "event_type": "purchase", "amount": 55.0, "session_minutes": 11.0},
]
with (RAW_DIR / "customers.csv").open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["customer_id", "signup_channel", "region", "retained"])
writer.writeheader()
writer.writerows(customers)
with (RAW_DIR / "events.jsonl").open("w", encoding="utf-8") as f:
for event in events:
f.write(json.dumps(event) + "\n")
sorted(path.name for path in RAW_DIR.iterdir())
[2]:
['customers.csv', 'events.jsonl']
1.2. Read With Schemas
Explicit schemas make examples more verbose, but they remove ambiguity. Spark does not have to infer types, and a bad input file is more likely to fail early instead of silently producing unexpected columns.
[3]:
customer_schema = StructType([
StructField("customer_id", IntegerType(), nullable=False),
StructField("signup_channel", StringType(), nullable=False),
StructField("region", StringType(), nullable=False),
StructField("retained", IntegerType(), nullable=False),
])
event_schema = StructType([
StructField("customer_id", IntegerType(), nullable=False),
StructField("event_time", StringType(), nullable=False),
StructField("event_type", StringType(), nullable=False),
StructField("amount", DoubleType(), nullable=False),
StructField("session_minutes", DoubleType(), nullable=False),
])
customer_df = (
spark.read
.option("header", True)
.schema(customer_schema)
.csv(str(RAW_DIR / "customers.csv"))
)
event_df = spark.read.schema(event_schema).json(str(RAW_DIR / "events.jsonl"))
customer_df.show()
event_df.show(5, truncate=False)
+-----------+--------------+------+--------+
|customer_id|signup_channel|region|retained|
+-----------+--------------+------+--------+
| 1| search| north| 1|
| 2| social| south| 0|
| 3| search| west| 1|
| 4| email| north| 1|
| 5| social| east| 0|
| 6| email| west| 1|
| 7| search| south| 0|
| 8| social| north| 0|
| 9| email| east| 1|
| 10| search| north| 1|
| 11| social| west| 0|
| 12| email| south| 1|
+-----------+--------------+------+--------+
+-----------+-------------------+----------+------+---------------+
|customer_id|event_time |event_type|amount|session_minutes|
+-----------+-------------------+----------+------+---------------+
|1 |2026-01-01 09:00:00|view |0.0 |8.0 |
|1 |2026-01-01 09:10:00|purchase |38.0 |12.0 |
|2 |2026-01-01 10:00:00|view |0.0 |3.0 |
|2 |2026-01-02 10:20:00|support |0.0 |6.0 |
|3 |2026-01-01 11:00:00|purchase |95.0 |15.0 |
+-----------+-------------------+----------+------+---------------+
only showing top 5 rows
1.3. Clean And Feature Engineer
The raw events have a timestamp string and one row per event. The curated table should have one row per customer. We parse the timestamp, derive an event date, aggregate behavior, and join the aggregates back to the customer attributes.
[4]:
clean_events = (
event_df
.withColumn("event_time", F.to_timestamp("event_time"))
.withColumn("event_date", F.to_date("event_time"))
.withColumn("is_purchase", (F.col("event_type") == "purchase").cast("int"))
)
behavior = (
clean_events
.groupBy("customer_id")
.agg(
F.count("*").alias("event_count"),
F.sum("is_purchase").alias("purchase_count"),
F.round(F.sum("amount"), 2).alias("total_amount"),
F.countDistinct("event_date").alias("active_days"),
F.round(F.avg("session_minutes"), 2).alias("avg_session_minutes"),
)
)
model_table = (
customer_df
.join(behavior, on="customer_id", how="left")
.fillna({
"event_count": 0,
"purchase_count": 0,
"total_amount": 0.0,
"active_days": 0,
"avg_session_minutes": 0.0,
})
)
model_table.orderBy("customer_id").show()
+-----------+--------------+------+--------+-----------+--------------+------------+-----------+-------------------+
|customer_id|signup_channel|region|retained|event_count|purchase_count|total_amount|active_days|avg_session_minutes|
+-----------+--------------+------+--------+-----------+--------------+------------+-----------+-------------------+
| 1| search| north| 1| 2| 1| 38.0| 1| 10.0|
| 2| social| south| 0| 2| 0| 0.0| 2| 4.5|
| 3| search| west| 1| 2| 2| 117.0| 2| 12.5|
| 4| email| north| 1| 2| 1| 48.0| 1| 8.0|
| 5| social| east| 0| 1| 0| 0.0| 1| 4.0|
| 6| email| west| 1| 2| 2| 88.0| 2| 11.5|
| 7| search| south| 0| 1| 0| 0.0| 1| 2.0|
| 8| social| north| 0| 1| 0| 0.0| 1| 4.0|
| 9| email| east| 1| 1| 1| 65.0| 1| 12.0|
| 10| search| north| 1| 2| 2| 76.0| 2| 10.5|
| 11| social| west| 0| 1| 0| 0.0| 1| 3.0|
| 12| email| south| 1| 1| 1| 55.0| 1| 11.0|
+-----------+--------------+------+--------+-----------+--------------+------------+-----------+-------------------+
1.4. Query With SQL
DataFrames and SQL are two faces of the same structured API. Once a DataFrame is registered as a temporary view, SQL is often the most direct way to express summaries for analysts or reporting code.
[5]:
model_table.createOrReplaceTempView("customer_features")
clean_events.createOrReplaceTempView("events")
spark.sql("""
SELECT
signup_channel,
region,
COUNT(*) AS customers,
ROUND(AVG(retained), 2) AS retention_rate,
ROUND(AVG(total_amount), 2) AS avg_total_amount
FROM customer_features
GROUP BY signup_channel, region
ORDER BY signup_channel, region
""").show()
spark.sql("""
SELECT
event_date,
ROUND(SUM(amount), 2) AS revenue
FROM events
GROUP BY event_date
ORDER BY event_date
""").show()
+--------------+------+---------+--------------+----------------+
|signup_channel|region|customers|retention_rate|avg_total_amount|
+--------------+------+---------+--------------+----------------+
| email| east| 1| 1.0| 65.0|
| email| north| 1| 1.0| 48.0|
| email| south| 1| 1.0| 55.0|
| email| west| 1| 1.0| 88.0|
| search| north| 2| 1.0| 57.0|
| search| south| 1| 0.0| 0.0|
| search| west| 1| 1.0| 117.0|
| social| east| 1| 0.0| 0.0|
| social| north| 1| 0.0| 0.0|
| social| south| 1| 0.0| 0.0|
| social| west| 1| 0.0| 0.0|
+--------------+------+---------+--------------+----------------+
+----------+-------+
|event_date|revenue|
+----------+-------+
|2026-01-01| 203.0|
|2026-01-02| 113.0|
|2026-01-03| 22.0|
|2026-01-04| 49.0|
|2026-01-05| 100.0|
+----------+-------+
1.5. Write A Curated Parquet Dataset
Parquet is a common Spark storage format because it is columnar, typed, splittable, and efficient for analytical queries. Here we partition by signup_channel so Spark can skip unrelated directories when a query filters on that column.
[6]:
(
model_table
.write
.mode("overwrite")
.partitionBy("signup_channel")
.parquet(str(CURATED_DIR))
)
curated = spark.read.parquet(str(CURATED_DIR))
curated.printSchema()
curated.orderBy("customer_id").show()
root
|-- customer_id: integer (nullable = true)
|-- region: string (nullable = true)
|-- retained: integer (nullable = true)
|-- event_count: long (nullable = true)
|-- purchase_count: long (nullable = true)
|-- total_amount: double (nullable = true)
|-- active_days: long (nullable = true)
|-- avg_session_minutes: double (nullable = true)
|-- signup_channel: string (nullable = true)
+-----------+------+--------+-----------+--------------+------------+-----------+-------------------+--------------+
|customer_id|region|retained|event_count|purchase_count|total_amount|active_days|avg_session_minutes|signup_channel|
+-----------+------+--------+-----------+--------------+------------+-----------+-------------------+--------------+
| 1| north| 1| 2| 1| 38.0| 1| 10.0| search|
| 2| south| 0| 2| 0| 0.0| 2| 4.5| social|
| 3| west| 1| 2| 2| 117.0| 2| 12.5| search|
| 4| north| 1| 2| 1| 48.0| 1| 8.0| email|
| 5| east| 0| 1| 0| 0.0| 1| 4.0| social|
| 6| west| 1| 2| 2| 88.0| 2| 11.5| email|
| 7| south| 0| 1| 0| 0.0| 1| 2.0| search|
| 8| north| 0| 1| 0| 0.0| 1| 4.0| social|
| 9| east| 1| 1| 1| 65.0| 1| 12.0| email|
| 10| north| 1| 2| 2| 76.0| 2| 10.5| search|
| 11| west| 0| 1| 0| 0.0| 1| 3.0| social|
| 12| south| 1| 1| 1| 55.0| 1| 11.0| email|
+-----------+------+--------+-----------+--------------+------------+-----------+-------------------+--------------+
1.6. Train A Tiny ML Pipeline
The model below predicts the retained label from behavioral and categorical features. The dataset is too small for a real evaluation, so treat this as a pipeline demonstration rather than a serious model. The useful part is the shape: index categorical columns, encode them, assemble all features, fit a classifier, and score rows.
[7]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
training = spark.read.parquet(str(CURATED_DIR))
channel_indexer = StringIndexer(
inputCol="signup_channel",
outputCol="signup_channel_index",
handleInvalid="keep",
)
region_indexer = StringIndexer(
inputCol="region",
outputCol="region_index",
handleInvalid="keep",
)
encoder = OneHotEncoder(
inputCols=["signup_channel_index", "region_index"],
outputCols=["signup_channel_vec", "region_vec"],
)
assembler = VectorAssembler(
inputCols=[
"event_count",
"purchase_count",
"total_amount",
"active_days",
"avg_session_minutes",
"signup_channel_vec",
"region_vec",
],
outputCol="features",
)
classifier = LogisticRegression(
featuresCol="features",
labelCol="retained",
maxIter=20,
regParam=0.1,
)
pipeline = Pipeline(stages=[channel_indexer, region_indexer, encoder, assembler, classifier])
model = pipeline.fit(training)
predictions = model.transform(training)
predictions.select(
"customer_id",
"signup_channel",
"region",
"retained",
"prediction",
"probability",
).orderBy("customer_id").show(truncate=False)
evaluator = BinaryClassificationEvaluator(labelCol="retained", rawPredictionCol="rawPrediction")
print("training AUC:", round(evaluator.evaluate(predictions), 3))
+-----------+--------------+------+--------+----------+-----------------------------------------+
|customer_id|signup_channel|region|retained|prediction|probability |
+-----------+--------------+------+--------+----------+-----------------------------------------+
|1 |search |north |1 |1.0 |[0.11813549652974949,0.8818645034702505] |
|2 |social |south |0 |0.0 |[0.9074118462620312,0.0925881537379688] |
|3 |search |west |1 |1.0 |[0.024824789274493213,0.9751752107255068]|
|4 |email |north |1 |1.0 |[0.06879814555326737,0.9312018544467326] |
|5 |social |east |0 |0.0 |[0.9252207819282422,0.0747792180717578] |
|6 |email |west |1 |1.0 |[0.019446932902371948,0.980553067097628] |
|7 |search |south |0 |0.0 |[0.8693523037314035,0.1306476962685965] |
|8 |social |north |0 |0.0 |[0.8903862350037638,0.10961376499623621] |
|9 |email |east |1 |1.0 |[0.07035156182898863,0.9296484381710114] |
|10 |search |north |1 |1.0 |[0.038010634526183466,0.9619893654738165]|
|11 |social |west |0 |0.0 |[0.9391171360016877,0.06088286399831233] |
|12 |email |south |1 |1.0 |[0.12894423085528636,0.8710557691447136] |
+-----------+--------------+------+--------+----------+-----------------------------------------+
training AUC: 1.0
1.7. What To Remember
This local project follows the same broad pattern as a larger Spark job.
Use schemas at the boundary.
Convert raw records into a typed, queryable shape.
Use SQL when it makes summaries clearer.
Write curated data in a format that Spark can scan efficiently.
Build ML pipelines from stable feature tables, not one-off intermediate objects.
The data is tiny, but the workflow is the transferable lesson.
[8]:
spark.stop()