3. SparkSQL

SparkSQL allows you to use SQL statements directly against a Spark DataFrame.

3.1. Local Spark setup

[ ]:
# Local Spark setup for the book examples.
from pathlib import Path
from pyspark.sql import SparkSession

DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "sparksql"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

builder = (
    SparkSession.builder
    .master("local[*]")
    .appName("spark-intro-sparksql")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .config("spark.sql.adaptive.enabled", "false")
)
spark = builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark

3.2. Data

Let’s set up some data first. The JSON files already live with the notebook source.

3.2.1. Locate local data

[ ]:
people_path = DATA_DIR / 'people.json'
television_path = DATA_DIR / 'television.json'
people_path, television_path

3.2.2. Read data and create temporary tables

Before we can issue SQL statements against these DataFrames, we need to register them through createOrReplaceTempView().

[ ]:
people_df = sqlContext.read.json(str(DATA_DIR / 'people.json'))
people_df.createOrReplaceTempView('people')

tv_df = sqlContext.read.json(str(DATA_DIR / 'television.json'))
tv_df.createOrReplaceTempView('tv')

3.2.3. Show schemas

[3]:
people_df.printSchema()
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- zip: long (nullable = true)
 |-- age: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- height: double (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)
 |-- male: boolean (nullable = true)
 |-- sports: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- weight: double (nullable = true)

[4]:
tv_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- person_id: long (nullable = true)

3.3. Select

Now we can use sqlContext.sql() to issue SQL statements against the DataFrames.

[5]:
sql = """
select *
from people
"""

sqlContext.sql(sql).show()
+--------------------+---+----------+------+---+---------+-----+--------------------+------+
|             address|age|first_name|height| id|last_name| male|              sports|weight|
+--------------------+---+----------+------+---+---------+-----+--------------------+------+
|[Washington, DC, ...| 27|      John|   6.5|  1|      Doe| true|    [hockey, tennis]| 155.5|
|[Washington, DC, ...| 22|      Jane|   5.7|  2|    Smith|false|[basketball, tennis]| 135.5|
|[Los Angeles, CA,...| 25|      Jack|   6.6|  3|    Smith| true|  [baseball, soccer]| 175.5|
|[Los Angeles, CA,...| 18|     Janet|   5.5|  4|      Doe|false|    [judo, baseball]| 125.5|
+--------------------+---+----------+------+---+---------+-----+--------------------+------+

3.4. Count

[6]:
sql = """
select male, count(*) as total
from people
group by male
"""

sqlContext.sql(sql).show()
+-----+-----+
| male|total|
+-----+-----+
| true|    2|
|false|    2|
+-----+-----+

3.5. Nested query

[7]:
sql = """
select first_name, last_name
from (
    select *
    from people
)
"""

sqlContext.sql(sql).show()
+----------+---------+
|first_name|last_name|
+----------+---------+
|      John|      Doe|
|      Jane|    Smith|
|      Jack|    Smith|
|     Janet|      Doe|
+----------+---------+

3.6. Nested fields

[8]:
sql = """
select first_name, last_name, address.city, address.state
from people
"""

sqlContext.sql(sql).show()
+----------+---------+-----------+-----+
|first_name|last_name|       city|state|
+----------+---------+-----------+-----+
|      John|      Doe| Washington|   DC|
|      Jane|    Smith| Washington|   DC|
|      Jack|    Smith|Los Angeles|   CA|
|     Janet|      Doe|Los Angeles|   CA|
+----------+---------+-----------+-----+

3.7. Left join

[9]:
sql = """
select p.id id, first_name, last_name, name
from people p
    left join tv t on (p.id = t.person_id)
"""

sqlContext.sql(sql).show()
+---+----------+---------+-----------------+
| id|first_name|last_name|             name|
+---+----------+---------+-----------------+
|  1|      John|      Doe|  Who's the Boss?|
|  1|      John|      Doe|        House, MD|
|  2|      Jane|    Smith|       Full House|
|  2|      Jane|    Smith|    Facts of Life|
|  2|      Jane|    Smith|Charles in Charge|
|  3|      Jack|    Smith|      Family Ties|
|  3|      Jack|    Smith|      Night Court|
|  3|      Jack|    Smith|           Cheers|
|  3|      Jack|    Smith|       Happy Days|
|  4|     Janet|      Doe|             null|
+---+----------+---------+-----------------+

3.8. Right join

[10]:
sql = """
select p.id id, first_name, last_name, name
from people p
    right join tv t on (p.id = t.person_id)
"""

sqlContext.sql(sql).show()
+----+----------+---------+-----------------+
|  id|first_name|last_name|             name|
+----+----------+---------+-----------------+
|   1|      John|      Doe|        House, MD|
|   1|      John|      Doe|  Who's the Boss?|
|   2|      Jane|    Smith|Charles in Charge|
|   2|      Jane|    Smith|    Facts of Life|
|   2|      Jane|    Smith|       Full House|
|   3|      Jack|    Smith|       Happy Days|
|   3|      Jack|    Smith|           Cheers|
|   3|      Jack|    Smith|      Night Court|
|   3|      Jack|    Smith|      Family Ties|
|null|      null|     null|              ALF|
|null|      null|     null|    Growing Pains|
|null|      null|     null|  Three's Company|
+----+----------+---------+-----------------+

3.9. Inner join

[11]:
sql = """
select p.id id, first_name, last_name, name
from people p
    inner join tv t on (p.id = t.person_id)
"""

sqlContext.sql(sql).show()
+---+----------+---------+-----------------+
| id|first_name|last_name|             name|
+---+----------+---------+-----------------+
|  1|      John|      Doe|        House, MD|
|  1|      John|      Doe|  Who's the Boss?|
|  2|      Jane|    Smith|Charles in Charge|
|  2|      Jane|    Smith|    Facts of Life|
|  2|      Jane|    Smith|       Full House|
|  3|      Jack|    Smith|       Happy Days|
|  3|      Jack|    Smith|           Cheers|
|  3|      Jack|    Smith|      Night Court|
|  3|      Jack|    Smith|      Family Ties|
+---+----------+---------+-----------------+

3.10. User defined function

We can register UDFs and use them in our SQL statements too.

[12]:
from pyspark.sql.types import *

def to_days(years):
    return 365.25 * years

spark.udf.register('toDays', to_days, FloatType())

sql = """
select first_name, last_name, toDays(age) age_days
from people
"""

sqlContext.sql(sql).show()
+----------+---------+--------+
|first_name|last_name|age_days|
+----------+---------+--------+
|      John|      Doe| 9861.75|
|      Jane|    Smith|  8035.5|
|      Jack|    Smith| 9131.25|
|     Janet|      Doe|  6574.5|
+----------+---------+--------+