3. Packaging Spark Jobs

Notebook code is useful for exploration, but scheduled Spark work should usually run as a script or package. A packaged job has an explicit entry point, input arguments, output arguments, and Spark configuration supplied by the submit command or the runtime environment.

This chapter writes and runs a tiny local job with spark-submit so the packaging pattern is visible without requiring a cluster.

[1]:
from pathlib import Path
import json
import shutil
import subprocess
import textwrap

from pyspark.sql import SparkSession

DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "packaging-spark-jobs"
JOB_DIR = OUTPUT_DIR / "job"
INPUT_DIR = OUTPUT_DIR / "input"
RESULT_DIR = OUTPUT_DIR / "daily_sales"

if OUTPUT_DIR.exists():
    shutil.rmtree(OUTPUT_DIR)
JOB_DIR.mkdir(parents=True, exist_ok=True)
INPUT_DIR.mkdir(parents=True, exist_ok=True)

print("work directory:", OUTPUT_DIR.relative_to(DATA_DIR))

work directory: _spark_output/packaging-spark-jobs

3.1. A Script Entry Point

The script below has three parts: a pure transformation function, argument parsing, and a main() function that creates the Spark session. This separation makes the transformation easier to test and the job easier to run from different environments.

[2]:
job_path = JOB_DIR / "daily_sales_job.py"
job_path.write_text(textwrap.dedent('''
    import argparse

    from pyspark.sql import SparkSession, functions as F
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType


    def build_daily_sales(spark, input_path):
        schema = StructType([
            StructField("day", StringType(), nullable=False),
            StructField("category", StringType(), nullable=False),
            StructField("amount", DoubleType(), nullable=False),
        ])
        sales = spark.read.schema(schema).json(input_path)
        return (
            sales
            .groupBy("day", "category")
            .agg(F.round(F.sum("amount"), 2).alias("revenue"))
        )


    def parse_args():
        parser = argparse.ArgumentParser()
        parser.add_argument("--input", required=True)
        parser.add_argument("--output", required=True)
        return parser.parse_args()


    def main():
        args = parse_args()
        spark = (
            SparkSession.builder
            .appName("daily-sales-job")
            .config("spark.sql.shuffle.partitions", "4")
            .getOrCreate()
        )
        spark.sparkContext.setLogLevel("ERROR")
        result = build_daily_sales(spark, args.input)
        result.write.mode("overwrite").parquet(args.output)
        spark.stop()


    if __name__ == "__main__":
        main()
''').strip() + "\n", encoding="utf-8")

print(job_path.relative_to(DATA_DIR))

_spark_output/packaging-spark-jobs/job/daily_sales_job.py

3.2. Local Input Files

The job should not depend on notebook variables. Inputs are files and command-line arguments. That is the same shape a scheduler or production orchestrator would use.

[3]:
records = [
    {"day": "2026-01-01", "category": "book", "amount": 10.0},
    {"day": "2026-01-01", "category": "book", "amount": 7.5},
    {"day": "2026-01-01", "category": "course", "amount": 20.0},
    {"day": "2026-01-02", "category": "book", "amount": 11.0},
]

with (INPUT_DIR / "sales.jsonl").open("w", encoding="utf-8") as f:
    for record in records:
        f.write(json.dumps(record) + "\n")

sorted(path.name for path in INPUT_DIR.iterdir())

[3]:
['sales.jsonl']

3.3. Run With spark-submit

In local mode, spark-submit still launches a driver process and runs the same script entry point that a cluster submission would run. Cluster deployments usually add packaging and dependency options, but the interface is the same idea.

[4]:
cmd = [
    "spark-submit",
    "--master", "local[*]",
    str(job_path),
    "--input", str(INPUT_DIR),
    "--output", str(RESULT_DIR),
]

completed = subprocess.run(cmd, text=True, capture_output=True, check=True)
print("return code:", completed.returncode)
print("stderr tail:", "\n".join(completed.stderr.splitlines()[-3:]))

return code: 0
stderr tail: 26/04/10 23:19:53 INFO BlockManagerMasterEndpoint: Registering block manager localhost:32881 with 434.4 MiB RAM, BlockManagerId(driver, localhost, 32881, None)
26/04/10 23:19:53 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, localhost, 32881, None)
26/04/10 23:19:53 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, localhost, 32881, None)

3.4. Inspect The Result

The notebook starts a separate Spark session only to inspect the files written by the submitted job. In a real workflow, the next job or an analytical query would read the output path.

[5]:
spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("spark-intro-packaging-reader")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.ui.showConsoleProgress", "false")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

spark.read.parquet(str(RESULT_DIR)).orderBy("day", "category").show()

+----------+--------+-------+
|       day|category|revenue|
+----------+--------+-------+
|2026-01-01|    book|   17.5|
|2026-01-01|  course|   20.0|
|2026-01-02|    book|   11.0|
+----------+--------+-------+

3.5. What To Remember

A packaged Spark job should have a small entry point, explicit arguments, and transformation functions that can be tested without running the whole job. Use notebooks to learn and inspect; use scripts or packages for repeatable jobs.

[6]:
spark.stop()