2. Structured Streaming
Structured Streaming is the modern Spark streaming API. The main idea is that a stream can be treated like a table that keeps receiving new rows. You describe transformations with the same DataFrame operations used for batch work, and Spark turns that description into a continuously updated query.
This chapter uses local files as the stream source so that the example runs in Docker with local[*]. There is no socket server, HDFS directory, Kafka broker, or standalone Spark cluster. The local setup is small, but the mental model is the same one used in larger deployments: define a source, transform rows, choose an output mode, and attach a checkpoint when the result must be recoverable.
2.1. The Streaming Model
A Structured Streaming query has four important parts.
A source supplies new rows. Sources include files, Kafka, rate streams, and other connectors.
A logical plan describes transformations using DataFrame code.
A sink receives the output. Common sinks include memory tables, files, Kafka, and
foreachBatchfunctions.A trigger decides when Spark checks for new data and runs another micro-batch.
For teaching and testing, the file source is a good fit because adding another file is like another small batch of events arriving.
[1]:
from pathlib import Path
import json
import shutil
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "structured-streaming"
INPUT_DIR = OUTPUT_DIR / "input"
CHECKPOINT_DIR = OUTPUT_DIR / "checkpoints"
PARQUET_DIR = OUTPUT_DIR / "events-parquet"
if OUTPUT_DIR.exists():
shutil.rmtree(OUTPUT_DIR)
INPUT_DIR.mkdir(parents=True, exist_ok=True)
spark = (
SparkSession.builder
.master("local[*]")
.appName("spark-intro-structured-streaming")
.config("spark.driver.host", "127.0.0.1")
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.default.parallelism", "4")
.config("spark.sql.adaptive.enabled", "false")
.config("spark.ui.showConsoleProgress", "false")
.getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")
print("Spark version:", spark.version)
print("Spark master:", sc.master)
Spark version: 3.5.8
Spark master: local[*]
2.2. A Local Event Stream
The records below represent web and purchase events. Each JSON line is one event. In a real application, new data would arrive over time. In this local notebook, we create a few files before the streaming query starts so the whole example can finish deterministically during automated notebook execution.
[2]:
def write_json_lines(path, records):
with path.open("w", encoding="utf-8") as f:
for record in records:
f.write(json.dumps(record) + "\n")
write_json_lines(
INPUT_DIR / "batch-001.json",
[
{"event_id": "e001", "user_id": "u01", "event_type": "view", "event_time": "2026-01-01 09:00:00", "amount": 0.0},
{"event_id": "e002", "user_id": "u01", "event_type": "purchase", "event_time": "2026-01-01 09:04:00", "amount": 24.50},
{"event_id": "e003", "user_id": "u02", "event_type": "view", "event_time": "2026-01-01 09:05:00", "amount": 0.0},
],
)
write_json_lines(
INPUT_DIR / "batch-002.json",
[
{"event_id": "e004", "user_id": "u02", "event_type": "purchase", "event_time": "2026-01-01 09:08:00", "amount": 11.25},
{"event_id": "e005", "user_id": "u03", "event_type": "view", "event_time": "2026-01-01 10:01:00", "amount": 0.0},
{"event_id": "e006", "user_id": "u03", "event_type": "purchase", "event_time": "2026-01-01 10:03:00", "amount": 63.00},
],
)
sorted(path.name for path in INPUT_DIR.glob("*.json"))
[2]:
['batch-001.json', 'batch-002.json']
2.3. Reading A Stream
readStream defines a streaming DataFrame. The DataFrame is lazy, just like a batch DataFrame. The cell below does not read any data yet. It only tells Spark what the incoming rows will look like and where new files should be discovered.
[3]:
schema = StructType([
StructField("event_id", StringType(), nullable=False),
StructField("user_id", StringType(), nullable=False),
StructField("event_type", StringType(), nullable=False),
StructField("event_time", StringType(), nullable=False),
StructField("amount", DoubleType(), nullable=False),
])
raw_events = spark.readStream.schema(schema).json(str(INPUT_DIR))
events = raw_events.withColumn("event_time", F.to_timestamp("event_time"))
events.isStreaming
[3]:
True
2.4. Aggregating Into A Memory Sink
A memory sink is useful for lessons because the result can be queried with SQL after the stream finishes. It is not a durable production sink, but it is perfect for showing how a streaming DataFrame updates an aggregate.
The query below uses availableNow=True, which tells Spark to process all currently available files and then stop. That makes the notebook repeatable while still exercising the streaming engine.
[4]:
spend_by_user = (
events
.groupBy("user_id")
.agg(
F.count("*").alias("events"),
F.round(F.sum("amount"), 2).alias("total_amount"),
)
)
query = (
spend_by_user.writeStream
.format("memory")
.queryName("local_spend_by_user")
.outputMode("complete")
.trigger(availableNow=True)
.option("checkpointLocation", str(CHECKPOINT_DIR / "spend-by-user"))
.start()
)
query.awaitTermination()
spark.sql("""
SELECT user_id, events, total_amount
FROM local_spend_by_user
ORDER BY user_id
""").show()
+-------+------+------------+
|user_id|events|total_amount|
+-------+------+------------+
| u01| 2| 24.5|
| u02| 2| 11.25|
| u03| 2| 63.0|
+-------+------+------------+
2.5. Inspecting Progress
Every streaming query records progress metadata. This is useful when a query appears slow, when a source stops receiving data, or when you want to verify how many rows a micro-batch processed.
[5]:
progress = query.recentProgress
print("total input rows:", sum(item["numInputRows"] for item in progress))
total input rows: 6
2.6. Event-Time Windows
Streaming jobs often group events by event time rather than by processing time. The window() function groups timestamps into fixed intervals. A watermark tells Spark how late data is allowed to be before Spark can safely clean up state.
This example is small, so the watermark does not remove anything. It is included because event-time windows and watermarks usually belong together in real streaming jobs.
[6]:
windowed_events = (
events
.withWatermark("event_time", "10 minutes")
.groupBy(F.window("event_time", "1 hour"), "event_type")
.agg(
F.count("*").alias("events"),
F.round(F.sum("amount"), 2).alias("amount"),
)
)
query = (
windowed_events.writeStream
.format("memory")
.queryName("local_events_by_hour")
.outputMode("complete")
.trigger(availableNow=True)
.option("checkpointLocation", str(CHECKPOINT_DIR / "events-by-hour"))
.start()
)
query.awaitTermination()
spark.sql("""
SELECT
date_format(window.start, 'yyyy-MM-dd HH:mm') AS hour,
event_type,
events,
amount
FROM local_events_by_hour
ORDER BY hour, event_type
""").show(truncate=False)
+----------------+----------+------+------+
|hour |event_type|events|amount|
+----------------+----------+------+------+
|2026-01-01 09:00|purchase |2 |35.75 |
|2026-01-01 09:00|view |2 |0.0 |
|2026-01-01 10:00|purchase |1 |63.0 |
|2026-01-01 10:00|view |1 |0.0 |
+----------------+----------+------+------+
2.7. Writing A File Sink
The same stream can also be written to a local Parquet directory. A checkpoint directory lets Spark remember which files have already been processed for that sink. In this notebook we run once and stop, but the checkpoint path is still part of the habit worth learning.
[7]:
query = (
events
.select("event_id", "user_id", "event_type", "event_time", "amount")
.writeStream
.format("parquet")
.outputMode("append")
.trigger(availableNow=True)
.option("path", str(PARQUET_DIR))
.option("checkpointLocation", str(CHECKPOINT_DIR / "events-parquet"))
.start()
)
query.awaitTermination()
spark.read.parquet(str(PARQUET_DIR)).orderBy("event_id").show(truncate=False)
+--------+-------+----------+-------------------+------+
|event_id|user_id|event_type|event_time |amount|
+--------+-------+----------+-------------------+------+
|e001 |u01 |view |2026-01-01 09:00:00|0.0 |
|e002 |u01 |purchase |2026-01-01 09:04:00|24.5 |
|e003 |u02 |view |2026-01-01 09:05:00|0.0 |
|e004 |u02 |purchase |2026-01-01 09:08:00|11.25 |
|e005 |u03 |view |2026-01-01 10:01:00|0.0 |
|e006 |u03 |purchase |2026-01-01 10:03:00|63.0 |
+--------+-------+----------+-------------------+------+
2.8. What To Remember
Structured Streaming is not a separate programming model from DataFrames. It is the DataFrame model applied to data that does not have a fixed end. In local examples, availableNow and file sources make streaming easy to test. In production, the same query shape usually changes only in the source, sink, checkpoint location, and operational settings.
[8]:
spark.stop()