3. Packaging Spark Jobs
Notebook code is useful for exploration, but scheduled Spark work should usually run as a script or package. A packaged job has an explicit entry point, input arguments, output arguments, and Spark configuration supplied by the submit command or the runtime environment.
This chapter writes and runs a tiny local job with spark-submit so the packaging pattern is visible without requiring a cluster.
[1]:
from pathlib import Path
import json
import shutil
import subprocess
import textwrap
from pyspark.sql import SparkSession
DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "packaging-spark-jobs"
JOB_DIR = OUTPUT_DIR / "job"
INPUT_DIR = OUTPUT_DIR / "input"
RESULT_DIR = OUTPUT_DIR / "daily_sales"
if OUTPUT_DIR.exists():
shutil.rmtree(OUTPUT_DIR)
JOB_DIR.mkdir(parents=True, exist_ok=True)
INPUT_DIR.mkdir(parents=True, exist_ok=True)
print("work directory:", OUTPUT_DIR.relative_to(DATA_DIR))
work directory: _spark_output/packaging-spark-jobs
3.1. A Script Entry Point
The script below has three parts: a pure transformation function, argument parsing, and a main() function that creates the Spark session. This separation makes the transformation easier to test and the job easier to run from different environments.
[2]:
job_path = JOB_DIR / "daily_sales_job.py"
job_path.write_text(textwrap.dedent('''
import argparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
def build_daily_sales(spark, input_path):
schema = StructType([
StructField("day", StringType(), nullable=False),
StructField("category", StringType(), nullable=False),
StructField("amount", DoubleType(), nullable=False),
])
sales = spark.read.schema(schema).json(input_path)
return (
sales
.groupBy("day", "category")
.agg(F.round(F.sum("amount"), 2).alias("revenue"))
)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
parser.add_argument("--output", required=True)
return parser.parse_args()
def main():
args = parse_args()
spark = (
SparkSession.builder
.appName("daily-sales-job")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
result = build_daily_sales(spark, args.input)
result.write.mode("overwrite").parquet(args.output)
spark.stop()
if __name__ == "__main__":
main()
''').strip() + "\n", encoding="utf-8")
print(job_path.relative_to(DATA_DIR))
_spark_output/packaging-spark-jobs/job/daily_sales_job.py
3.2. Local Input Files
The job should not depend on notebook variables. Inputs are files and command-line arguments. That is the same shape a scheduler or production orchestrator would use.
[3]:
records = [
{"day": "2026-01-01", "category": "book", "amount": 10.0},
{"day": "2026-01-01", "category": "book", "amount": 7.5},
{"day": "2026-01-01", "category": "course", "amount": 20.0},
{"day": "2026-01-02", "category": "book", "amount": 11.0},
]
with (INPUT_DIR / "sales.jsonl").open("w", encoding="utf-8") as f:
for record in records:
f.write(json.dumps(record) + "\n")
sorted(path.name for path in INPUT_DIR.iterdir())
[3]:
['sales.jsonl']
3.3. Run With spark-submit
In local mode, spark-submit still launches a driver process and runs the same script entry point that a cluster submission would run. Cluster deployments usually add packaging and dependency options, but the interface is the same idea.
[4]:
cmd = [
"spark-submit",
"--master", "local[*]",
str(job_path),
"--input", str(INPUT_DIR),
"--output", str(RESULT_DIR),
]
completed = subprocess.run(cmd, text=True, capture_output=True, check=True)
print("return code:", completed.returncode)
print("stderr tail:", "\n".join(completed.stderr.splitlines()[-3:]))
return code: 0
stderr tail: 26/04/10 23:19:53 INFO BlockManagerMasterEndpoint: Registering block manager localhost:32881 with 434.4 MiB RAM, BlockManagerId(driver, localhost, 32881, None)
26/04/10 23:19:53 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, localhost, 32881, None)
26/04/10 23:19:53 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, localhost, 32881, None)
3.4. Inspect The Result
The notebook starts a separate Spark session only to inspect the files written by the submitted job. In a real workflow, the next job or an analytical query would read the output path.
[5]:
spark = (
SparkSession.builder
.master("local[*]")
.appName("spark-intro-packaging-reader")
.config("spark.driver.host", "127.0.0.1")
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.ui.showConsoleProgress", "false")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
spark.read.parquet(str(RESULT_DIR)).orderBy("day", "category").show()
+----------+--------+-------+
| day|category|revenue|
+----------+--------+-------+
|2026-01-01| book| 17.5|
|2026-01-01| course| 20.0|
|2026-01-02| book| 11.0|
+----------+--------+-------+
3.5. What To Remember
A packaged Spark job should have a small entry point, explicit arguments, and transformation functions that can be tested without running the whole job. Use notebooks to learn and inspect; use scripts or packages for repeatable jobs.
[6]:
spark.stop()