3. SparkSQL
SparkSQL allows you to use SQL statements directly against a Spark DataFrame.
3.1. Data
Let’s set up some data first. We will copy JSON data to HDFS.
3.1.1. Copy data to HDFS
[1]:
%%sh
hdfs dfs -copyFromLocal -f /root/ipynb/people.json /people.json
hdfs dfs -copyFromLocal -f /root/ipynb/television.json /television.json
2020-11-07 04:55:40,471 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-11-07 04:55:43,961 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
3.1.2. Read data and create temporary tables
Before we can issue SQL statements against these DataFrames, we need to register them through registerTempTable()
.
[2]:
people_df = sqlContext.read.json('hdfs://localhost/people.json')
people_df.registerTempTable('people')
tv_df = sqlContext.read.json('hdfs://localhost/television.json')
tv_df.registerTempTable('tv')
3.1.3. Show schemas
[3]:
people_df.printSchema()
root
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- state: string (nullable = true)
| |-- street: string (nullable = true)
| |-- zip: long (nullable = true)
|-- age: long (nullable = true)
|-- first_name: string (nullable = true)
|-- height: double (nullable = true)
|-- id: long (nullable = true)
|-- last_name: string (nullable = true)
|-- male: boolean (nullable = true)
|-- sports: array (nullable = true)
| |-- element: string (containsNull = true)
|-- weight: double (nullable = true)
[4]:
tv_df.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- person_id: long (nullable = true)
3.2. Select
Now we can use sqlContext.sql()
to issue SQL statements against the DataFrames.
[5]:
sql = """
select *
from people
"""
sqlContext.sql(sql).show()
+--------------------+---+----------+------+---+---------+-----+--------------------+------+
| address|age|first_name|height| id|last_name| male| sports|weight|
+--------------------+---+----------+------+---+---------+-----+--------------------+------+
|[Washington, DC, ...| 27| John| 6.5| 1| Doe| true| [hockey, tennis]| 155.5|
|[Washington, DC, ...| 22| Jane| 5.7| 2| Smith|false|[basketball, tennis]| 135.5|
|[Los Angeles, CA,...| 25| Jack| 6.6| 3| Smith| true| [baseball, soccer]| 175.5|
|[Los Angeles, CA,...| 18| Janet| 5.5| 4| Doe|false| [judo, baseball]| 125.5|
+--------------------+---+----------+------+---+---------+-----+--------------------+------+
3.3. Count
[6]:
sql = """
select male, count(*) as total
from people
group by male
"""
sqlContext.sql(sql).show()
+-----+-----+
| male|total|
+-----+-----+
| true| 2|
|false| 2|
+-----+-----+
3.4. Nested query
[7]:
sql = """
select first_name, last_name
from (
select *
from people
)
"""
sqlContext.sql(sql).show()
+----------+---------+
|first_name|last_name|
+----------+---------+
| John| Doe|
| Jane| Smith|
| Jack| Smith|
| Janet| Doe|
+----------+---------+
3.5. Nested fields
[8]:
sql = """
select first_name, last_name, address.city, address.state
from people
"""
sqlContext.sql(sql).show()
+----------+---------+-----------+-----+
|first_name|last_name| city|state|
+----------+---------+-----------+-----+
| John| Doe| Washington| DC|
| Jane| Smith| Washington| DC|
| Jack| Smith|Los Angeles| CA|
| Janet| Doe|Los Angeles| CA|
+----------+---------+-----------+-----+
3.6. Left join
[9]:
sql = """
select p.id id, first_name, last_name, name
from people p
left join tv t on (p.id = t.person_id)
"""
sqlContext.sql(sql).show()
+---+----------+---------+-----------------+
| id|first_name|last_name| name|
+---+----------+---------+-----------------+
| 1| John| Doe| Who's the Boss?|
| 1| John| Doe| House, MD|
| 2| Jane| Smith| Full House|
| 2| Jane| Smith| Facts of Life|
| 2| Jane| Smith|Charles in Charge|
| 3| Jack| Smith| Family Ties|
| 3| Jack| Smith| Night Court|
| 3| Jack| Smith| Cheers|
| 3| Jack| Smith| Happy Days|
| 4| Janet| Doe| null|
+---+----------+---------+-----------------+
3.7. Right join
[10]:
sql = """
select p.id id, first_name, last_name, name
from people p
right join tv t on (p.id = t.person_id)
"""
sqlContext.sql(sql).show()
+----+----------+---------+-----------------+
| id|first_name|last_name| name|
+----+----------+---------+-----------------+
| 1| John| Doe| House, MD|
| 1| John| Doe| Who's the Boss?|
| 2| Jane| Smith|Charles in Charge|
| 2| Jane| Smith| Facts of Life|
| 2| Jane| Smith| Full House|
| 3| Jack| Smith| Happy Days|
| 3| Jack| Smith| Cheers|
| 3| Jack| Smith| Night Court|
| 3| Jack| Smith| Family Ties|
|null| null| null| ALF|
|null| null| null| Growing Pains|
|null| null| null| Three's Company|
+----+----------+---------+-----------------+
3.8. Inner join
[11]:
sql = """
select p.id id, first_name, last_name, name
from people p
inner join tv t on (p.id = t.person_id)
"""
sqlContext.sql(sql).show()
+---+----------+---------+-----------------+
| id|first_name|last_name| name|
+---+----------+---------+-----------------+
| 1| John| Doe| House, MD|
| 1| John| Doe| Who's the Boss?|
| 2| Jane| Smith|Charles in Charge|
| 2| Jane| Smith| Facts of Life|
| 2| Jane| Smith| Full House|
| 3| Jack| Smith| Happy Days|
| 3| Jack| Smith| Cheers|
| 3| Jack| Smith| Night Court|
| 3| Jack| Smith| Family Ties|
+---+----------+---------+-----------------+
3.9. User defined function
We can register UDFs and use them in our SQL statements too.
[12]:
from pyspark.sql.types import *
def to_days(years):
return 365.25 * years
spark.udf.register('toDays', to_days, FloatType())
sql = """
select first_name, last_name, toDays(age) age_days
from people
"""
sqlContext.sql(sql).show()
+----------+---------+--------+
|first_name|last_name|age_days|
+----------+---------+--------+
| John| Doe| 9861.75|
| Jane| Smith| 8035.5|
| Jack| Smith| 9131.25|
| Janet| Doe| 6574.5|
+----------+---------+--------+