3. SparkSQL
SparkSQL allows you to use SQL statements directly against a Spark DataFrame.
3.1. Local Spark setup
[ ]:
# Local Spark setup for the book examples.
from pathlib import Path
from pyspark.sql import SparkSession
DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "sparksql"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
builder = (
SparkSession.builder
.master("local[*]")
.appName("spark-intro-sparksql")
.config("spark.driver.host", "127.0.0.1")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.default.parallelism", "4")
.config("spark.sql.adaptive.enabled", "false")
)
spark = builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark
3.2. Data
Let’s set up some data first. The JSON files already live with the notebook source.
3.2.1. Locate local data
[ ]:
people_path = DATA_DIR / 'people.json'
television_path = DATA_DIR / 'television.json'
people_path, television_path
3.2.2. Read data and create temporary tables
Before we can issue SQL statements against these DataFrames, we need to register them through createOrReplaceTempView().
[ ]:
people_df = sqlContext.read.json(str(DATA_DIR / 'people.json'))
people_df.createOrReplaceTempView('people')
tv_df = sqlContext.read.json(str(DATA_DIR / 'television.json'))
tv_df.createOrReplaceTempView('tv')
3.2.3. Show schemas
[3]:
people_df.printSchema()
root
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- state: string (nullable = true)
| |-- street: string (nullable = true)
| |-- zip: long (nullable = true)
|-- age: long (nullable = true)
|-- first_name: string (nullable = true)
|-- height: double (nullable = true)
|-- id: long (nullable = true)
|-- last_name: string (nullable = true)
|-- male: boolean (nullable = true)
|-- sports: array (nullable = true)
| |-- element: string (containsNull = true)
|-- weight: double (nullable = true)
[4]:
tv_df.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- person_id: long (nullable = true)
3.3. Select
Now we can use sqlContext.sql() to issue SQL statements against the DataFrames.
[5]:
sql = """
select *
from people
"""
sqlContext.sql(sql).show()
+--------------------+---+----------+------+---+---------+-----+--------------------+------+
| address|age|first_name|height| id|last_name| male| sports|weight|
+--------------------+---+----------+------+---+---------+-----+--------------------+------+
|[Washington, DC, ...| 27| John| 6.5| 1| Doe| true| [hockey, tennis]| 155.5|
|[Washington, DC, ...| 22| Jane| 5.7| 2| Smith|false|[basketball, tennis]| 135.5|
|[Los Angeles, CA,...| 25| Jack| 6.6| 3| Smith| true| [baseball, soccer]| 175.5|
|[Los Angeles, CA,...| 18| Janet| 5.5| 4| Doe|false| [judo, baseball]| 125.5|
+--------------------+---+----------+------+---+---------+-----+--------------------+------+
3.4. Count
[6]:
sql = """
select male, count(*) as total
from people
group by male
"""
sqlContext.sql(sql).show()
+-----+-----+
| male|total|
+-----+-----+
| true| 2|
|false| 2|
+-----+-----+
3.5. Nested query
[7]:
sql = """
select first_name, last_name
from (
select *
from people
)
"""
sqlContext.sql(sql).show()
+----------+---------+
|first_name|last_name|
+----------+---------+
| John| Doe|
| Jane| Smith|
| Jack| Smith|
| Janet| Doe|
+----------+---------+
3.6. Nested fields
[8]:
sql = """
select first_name, last_name, address.city, address.state
from people
"""
sqlContext.sql(sql).show()
+----------+---------+-----------+-----+
|first_name|last_name| city|state|
+----------+---------+-----------+-----+
| John| Doe| Washington| DC|
| Jane| Smith| Washington| DC|
| Jack| Smith|Los Angeles| CA|
| Janet| Doe|Los Angeles| CA|
+----------+---------+-----------+-----+
3.7. Left join
[9]:
sql = """
select p.id id, first_name, last_name, name
from people p
left join tv t on (p.id = t.person_id)
"""
sqlContext.sql(sql).show()
+---+----------+---------+-----------------+
| id|first_name|last_name| name|
+---+----------+---------+-----------------+
| 1| John| Doe| Who's the Boss?|
| 1| John| Doe| House, MD|
| 2| Jane| Smith| Full House|
| 2| Jane| Smith| Facts of Life|
| 2| Jane| Smith|Charles in Charge|
| 3| Jack| Smith| Family Ties|
| 3| Jack| Smith| Night Court|
| 3| Jack| Smith| Cheers|
| 3| Jack| Smith| Happy Days|
| 4| Janet| Doe| null|
+---+----------+---------+-----------------+
3.8. Right join
[10]:
sql = """
select p.id id, first_name, last_name, name
from people p
right join tv t on (p.id = t.person_id)
"""
sqlContext.sql(sql).show()
+----+----------+---------+-----------------+
| id|first_name|last_name| name|
+----+----------+---------+-----------------+
| 1| John| Doe| House, MD|
| 1| John| Doe| Who's the Boss?|
| 2| Jane| Smith|Charles in Charge|
| 2| Jane| Smith| Facts of Life|
| 2| Jane| Smith| Full House|
| 3| Jack| Smith| Happy Days|
| 3| Jack| Smith| Cheers|
| 3| Jack| Smith| Night Court|
| 3| Jack| Smith| Family Ties|
|null| null| null| ALF|
|null| null| null| Growing Pains|
|null| null| null| Three's Company|
+----+----------+---------+-----------------+
3.9. Inner join
[11]:
sql = """
select p.id id, first_name, last_name, name
from people p
inner join tv t on (p.id = t.person_id)
"""
sqlContext.sql(sql).show()
+---+----------+---------+-----------------+
| id|first_name|last_name| name|
+---+----------+---------+-----------------+
| 1| John| Doe| House, MD|
| 1| John| Doe| Who's the Boss?|
| 2| Jane| Smith|Charles in Charge|
| 2| Jane| Smith| Facts of Life|
| 2| Jane| Smith| Full House|
| 3| Jack| Smith| Happy Days|
| 3| Jack| Smith| Cheers|
| 3| Jack| Smith| Night Court|
| 3| Jack| Smith| Family Ties|
+---+----------+---------+-----------------+
3.10. User defined function
We can register UDFs and use them in our SQL statements too.
[12]:
from pyspark.sql.types import *
def to_days(years):
return 365.25 * years
spark.udf.register('toDays', to_days, FloatType())
sql = """
select first_name, last_name, toDays(age) age_days
from people
"""
sqlContext.sql(sql).show()
+----------+---------+--------+
|first_name|last_name|age_days|
+----------+---------+--------+
| John| Doe| 9861.75|
| Jane| Smith| 8035.5|
| Jack| Smith| 9131.25|
| Janet| Doe| 6574.5|
+----------+---------+--------+