1. Lakehouse Table Formats

Plain Parquet files are efficient, but a directory of Parquet files does not by itself provide table transactions, snapshot history, schema evolution rules, or row-level operations. Lakehouse table formats add metadata around data files so engines can treat the directory as a table.

This chapter uses local paths and a local Spark session. It demonstrates Parquet as the baseline, Delta Lake for transaction log and update/time travel behavior, and Apache Iceberg for catalog-backed tables and metadata tables. The point is the table-format mental model, not a production catalog.

[1]:
from pathlib import Path
import os
import shutil

from pyspark.sql import SparkSession, functions as F
from delta.tables import DeltaTable

DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "lakehouse-formats"

if OUTPUT_DIR.exists():
    shutil.rmtree(OUTPUT_DIR)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

jars = []
for value in [os.environ.get("SPARK_DELTA_JARS", ""), os.environ.get("SPARK_ICEBERG_JAR", "")]:
    jars.extend([item for item in value.split(",") if item])

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("spark-intro-lakehouse-formats")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .config("spark.sql.adaptive.enabled", "false")
    .config("spark.ui.showConsoleProgress", "false")
    .config("spark.jars", ",".join(jars))
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", str(OUTPUT_DIR / "iceberg-warehouse"))
    .getOrCreate()
)

sc = spark.sparkContext
sc.setLogLevel("ERROR")

print("Spark version:", spark.version)
print("format jars:", len(jars))

Spark version: 3.5.8
format jars: 4
[2]:
orders = spark.createDataFrame(
    [(1, "book", 10.0), (2, "course", 20.0), (3, "book", 7.5)],
    ["order_id", "category", "amount"],
)
orders.show()

+--------+--------+------+
|order_id|category|amount|
+--------+--------+------+
|       1|    book|  10.0|
|       2|  course|  20.0|
|       3|    book|   7.5|
+--------+--------+------+

1.1. Parquet Baseline

Parquet is a file format. Spark can read and write it efficiently, but the table state is essentially the set of files in the directory. There is no transaction log in this plain Parquet path.

[3]:
parquet_path = OUTPUT_DIR / "plain-parquet" / "orders"
orders.write.mode("overwrite").parquet(str(parquet_path))

print("Parquet directory entries:")
for path in sorted(parquet_path.iterdir()):
    print(path.name)

spark.read.parquet(str(parquet_path)).orderBy("order_id").show()

Parquet directory entries:
._SUCCESS.crc
.part-00000-9e197f38-9d7b-4565-9154-f93c8d981b72-c000.snappy.parquet.crc
.part-00001-9e197f38-9d7b-4565-9154-f93c8d981b72-c000.snappy.parquet.crc
.part-00002-9e197f38-9d7b-4565-9154-f93c8d981b72-c000.snappy.parquet.crc
.part-00003-9e197f38-9d7b-4565-9154-f93c8d981b72-c000.snappy.parquet.crc
_SUCCESS
part-00000-9e197f38-9d7b-4565-9154-f93c8d981b72-c000.snappy.parquet
part-00001-9e197f38-9d7b-4565-9154-f93c8d981b72-c000.snappy.parquet
part-00002-9e197f38-9d7b-4565-9154-f93c8d981b72-c000.snappy.parquet
part-00003-9e197f38-9d7b-4565-9154-f93c8d981b72-c000.snappy.parquet
+--------+--------+------+
|order_id|category|amount|
+--------+--------+------+
|       1|    book|  10.0|
|       2|  course|  20.0|
|       3|    book|   7.5|
+--------+--------+------+

1.2. Delta Lake

Delta Lake stores data files plus a _delta_log directory. The log tracks table versions, which enables operations such as updates and time travel. Delta’s quick start shows local paths as a valid way to explore these features with Spark.

[4]:
delta_path = OUTPUT_DIR / "delta" / "orders"
orders.write.format("delta").mode("overwrite").save(str(delta_path))

delta_table = DeltaTable.forPath(spark, str(delta_path))
delta_table.update("category = 'book'", {"amount": "amount + 1"})

print("Delta log exists:", (delta_path / "_delta_log").exists())
delta_table.toDF().orderBy("order_id").show()
delta_table.history().select("version", "operation").orderBy("version").show(truncate=False)

print("Version 0")
spark.read.format("delta").option("versionAsOf", 0).load(str(delta_path)).orderBy("order_id").show()

Delta log exists: True
+--------+--------+------+
|order_id|category|amount|
+--------+--------+------+
|       1|    book|  11.0|
|       2|  course|  20.0|
|       3|    book|   8.5|
+--------+--------+------+

+-------+---------+
|version|operation|
+-------+---------+
|0      |WRITE    |
|1      |UPDATE   |
+-------+---------+

Version 0
+--------+--------+------+
|order_id|category|amount|
+--------+--------+------+
|       1|    book|  10.0|
|       2|  course|  20.0|
|       3|    book|   7.5|
+--------+--------+------+

1.3. Apache Iceberg

Iceberg tables are loaded through a catalog. This local example uses a Hadoop catalog rooted in a local warehouse directory. Iceberg also exposes metadata tables, such as snapshots, through SQL.

[5]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.demo")
orders.writeTo("local.demo.orders").createOrReplace()

spark.sql("SELECT * FROM local.demo.orders ORDER BY order_id").show()
spark.sql("SELECT snapshot_id, operation FROM local.demo.orders.snapshots").show(truncate=False)

+--------+--------+------+
|order_id|category|amount|
+--------+--------+------+
|       1|    book|  10.0|
|       2|  course|  20.0|
|       3|    book|   7.5|
+--------+--------+------+

+------------------+---------+
|snapshot_id       |operation|
+------------------+---------+
|863585270242635520|overwrite|
+------------------+---------+

1.4. Choosing A Format

Parquet is a file format. Delta Lake and Iceberg are table formats that usually store Parquet data files plus table metadata. Choose plain Parquet for simple immutable extracts. Choose a table format when you need table transactions, snapshots, schema evolution, row-level changes, or multiple engines reading the same data lake table.

[6]:
spark.stop()