3. SparkSQL

SparkSQL allows you to use SQL statements directly against a Spark DataFrame.

3.1. Data

Let’s set up some data first. We will copy JSON data to HDFS.

3.1.1. Copy data to HDFS

[1]:
%%sh
hdfs dfs -copyFromLocal -f /root/ipynb/people.json /people.json
hdfs dfs -copyFromLocal -f /root/ipynb/television.json /television.json
2020-11-07 04:55:40,471 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-11-07 04:55:43,961 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

3.1.2. Read data and create temporary tables

Before we can issue SQL statements against these DataFrames, we need to register them through registerTempTable().

[2]:
people_df = sqlContext.read.json('hdfs://localhost/people.json')
people_df.registerTempTable('people')

tv_df = sqlContext.read.json('hdfs://localhost/television.json')
tv_df.registerTempTable('tv')

3.1.3. Show schemas

[3]:
people_df.printSchema()
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- zip: long (nullable = true)
 |-- age: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- height: double (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)
 |-- male: boolean (nullable = true)
 |-- sports: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- weight: double (nullable = true)

[4]:
tv_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- person_id: long (nullable = true)

3.2. Select

Now we can use sqlContext.sql() to issue SQL statements against the DataFrames.

[5]:
sql = """
select *
from people
"""

sqlContext.sql(sql).show()
+--------------------+---+----------+------+---+---------+-----+--------------------+------+
|             address|age|first_name|height| id|last_name| male|              sports|weight|
+--------------------+---+----------+------+---+---------+-----+--------------------+------+
|[Washington, DC, ...| 27|      John|   6.5|  1|      Doe| true|    [hockey, tennis]| 155.5|
|[Washington, DC, ...| 22|      Jane|   5.7|  2|    Smith|false|[basketball, tennis]| 135.5|
|[Los Angeles, CA,...| 25|      Jack|   6.6|  3|    Smith| true|  [baseball, soccer]| 175.5|
|[Los Angeles, CA,...| 18|     Janet|   5.5|  4|      Doe|false|    [judo, baseball]| 125.5|
+--------------------+---+----------+------+---+---------+-----+--------------------+------+

3.3. Count

[6]:
sql = """
select male, count(*) as total
from people
group by male
"""

sqlContext.sql(sql).show()
+-----+-----+
| male|total|
+-----+-----+
| true|    2|
|false|    2|
+-----+-----+

3.4. Nested query

[7]:
sql = """
select first_name, last_name
from (
    select *
    from people
)
"""

sqlContext.sql(sql).show()
+----------+---------+
|first_name|last_name|
+----------+---------+
|      John|      Doe|
|      Jane|    Smith|
|      Jack|    Smith|
|     Janet|      Doe|
+----------+---------+

3.5. Nested fields

[8]:
sql = """
select first_name, last_name, address.city, address.state
from people
"""

sqlContext.sql(sql).show()
+----------+---------+-----------+-----+
|first_name|last_name|       city|state|
+----------+---------+-----------+-----+
|      John|      Doe| Washington|   DC|
|      Jane|    Smith| Washington|   DC|
|      Jack|    Smith|Los Angeles|   CA|
|     Janet|      Doe|Los Angeles|   CA|
+----------+---------+-----------+-----+

3.6. Left join

[9]:
sql = """
select p.id id, first_name, last_name, name
from people p
    left join tv t on (p.id = t.person_id)
"""

sqlContext.sql(sql).show()
+---+----------+---------+-----------------+
| id|first_name|last_name|             name|
+---+----------+---------+-----------------+
|  1|      John|      Doe|  Who's the Boss?|
|  1|      John|      Doe|        House, MD|
|  2|      Jane|    Smith|       Full House|
|  2|      Jane|    Smith|    Facts of Life|
|  2|      Jane|    Smith|Charles in Charge|
|  3|      Jack|    Smith|      Family Ties|
|  3|      Jack|    Smith|      Night Court|
|  3|      Jack|    Smith|           Cheers|
|  3|      Jack|    Smith|       Happy Days|
|  4|     Janet|      Doe|             null|
+---+----------+---------+-----------------+

3.7. Right join

[10]:
sql = """
select p.id id, first_name, last_name, name
from people p
    right join tv t on (p.id = t.person_id)
"""

sqlContext.sql(sql).show()
+----+----------+---------+-----------------+
|  id|first_name|last_name|             name|
+----+----------+---------+-----------------+
|   1|      John|      Doe|        House, MD|
|   1|      John|      Doe|  Who's the Boss?|
|   2|      Jane|    Smith|Charles in Charge|
|   2|      Jane|    Smith|    Facts of Life|
|   2|      Jane|    Smith|       Full House|
|   3|      Jack|    Smith|       Happy Days|
|   3|      Jack|    Smith|           Cheers|
|   3|      Jack|    Smith|      Night Court|
|   3|      Jack|    Smith|      Family Ties|
|null|      null|     null|              ALF|
|null|      null|     null|    Growing Pains|
|null|      null|     null|  Three's Company|
+----+----------+---------+-----------------+

3.8. Inner join

[11]:
sql = """
select p.id id, first_name, last_name, name
from people p
    inner join tv t on (p.id = t.person_id)
"""

sqlContext.sql(sql).show()
+---+----------+---------+-----------------+
| id|first_name|last_name|             name|
+---+----------+---------+-----------------+
|  1|      John|      Doe|        House, MD|
|  1|      John|      Doe|  Who's the Boss?|
|  2|      Jane|    Smith|Charles in Charge|
|  2|      Jane|    Smith|    Facts of Life|
|  2|      Jane|    Smith|       Full House|
|  3|      Jack|    Smith|       Happy Days|
|  3|      Jack|    Smith|           Cheers|
|  3|      Jack|    Smith|      Night Court|
|  3|      Jack|    Smith|      Family Ties|
+---+----------+---------+-----------------+

3.9. User defined function

We can register UDFs and use them in our SQL statements too.

[12]:
from pyspark.sql.types import *

def to_days(years):
    return 365.25 * years

spark.udf.register('toDays', to_days, FloatType())

sql = """
select first_name, last_name, toDays(age) age_days
from people
"""

sqlContext.sql(sql).show()
+----------+---------+--------+
|first_name|last_name|age_days|
+----------+---------+--------+
|      John|      Doe| 9861.75|
|      Jane|    Smith|  8035.5|
|      Jack|    Smith| 9131.25|
|     Janet|      Doe|  6574.5|
+----------+---------+--------+