1. Input/Output

It’s very important to read and write data. Let’s see how to read and write data in Spark.

1.1. Local Spark setup

[ ]:
# Local Spark setup for the book examples.
from pathlib import Path
from pyspark.sql import SparkSession

DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "io"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

builder = (
    SparkSession.builder
    .master("local[*]")
    .appName("spark-intro-io")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .config("spark.sql.adaptive.enabled", "false")
)
spark = builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark

1.2. Data

Here’s some mock data we will create and use to illustrate IO operations.

[1]:
import pandas as pd
from random import randint, choice

def get_record(idx, n_cols):
    gender = choice(['male', 'female'])
    data = [idx, gender] + [randint(1, 100) for _ in range(n_cols)]
    return tuple(data)

n_cols = 10
n_rows = 10

data = [get_record(i, n_cols) for i, r in enumerate(range(n_rows))]
columns = ['id', 'gender'] + [f'x{i}' for i in range(n_cols)]

df = sqlContext.createDataFrame(pd.DataFrame(data, columns=columns))
[2]:
df.show()
+---+------+---+---+---+---+---+---+---+---+---+---+
| id|gender| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|
+---+------+---+---+---+---+---+---+---+---+---+---+
|  0|female| 46| 47|  5| 89| 68| 10| 28| 85|  6| 79|
|  1|  male| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35|
|  2|female|  1| 68|  3| 62| 53| 61| 97| 28| 35| 15|
|  3|female| 55| 87| 51|100| 17| 41|  6| 41| 45| 56|
|  4|female| 12| 73|  7| 58| 97| 34| 86| 68| 86| 14|
|  5|female| 86| 70| 57| 74|  8| 11| 84| 42| 93| 16|
|  6|female| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|
|  7|female| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|
|  8|  male| 85|  4| 50| 71| 72| 40| 88| 55| 30| 52|
|  9|female| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|
+---+------+---+---+---+---+---+---+---+---+---+---+

[3]:
df.printSchema()
root
 |-- id: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- x0: long (nullable = true)
 |-- x1: long (nullable = true)
 |-- x2: long (nullable = true)
 |-- x3: long (nullable = true)
 |-- x4: long (nullable = true)
 |-- x5: long (nullable = true)
 |-- x6: long (nullable = true)
 |-- x7: long (nullable = true)
 |-- x8: long (nullable = true)
 |-- x9: long (nullable = true)

1.3. Writing data

1.3.1. CSV

When we want to write a CSV file, we have to specify the format.

[ ]:
df.write\
    .format('csv')\
    .mode('overwrite')\
    .option('header', 'true')\
    .save(str(OUTPUT_DIR / 'data.csv'))

Note that multiple CSV files are written as data will be written in parallel.

[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.csv').iterdir())

1.3.2. As one CSV file

If we wanted to write to only 1 file, we need to create one partition.

[ ]:
df.repartition(1).write\
    .format('csv')\
    .mode('overwrite')\
    .option('header', 'true')\
    .save(str(OUTPUT_DIR / 'data.csv'))

[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.csv').iterdir())

1.3.3. JSON

Writing data as JSON is accomplished by specifying the JSON format.

[ ]:
df.write\
    .format('json')\
    .mode('overwrite')\
    .save(str(OUTPUT_DIR / 'data.json'))

[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.json').iterdir())

1.3.4. Parquet

Parquet files are also easy to create.

[ ]:
df.write\
    .format('parquet')\
    .mode('overwrite')\
    .save(str(OUTPUT_DIR / 'data.parquet'))

[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.parquet').iterdir())

1.3.5. Parquet with partitions

If we want to create partitioned Parquet files, we need to specify which field to partition by.

[ ]:
df.write\
    .format('parquet')\
    .mode('overwrite')\
    .partitionBy('gender')\
    .save(str(OUTPUT_DIR / 'data.parquet'))

[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.parquet').iterdir())

[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.parquet' / 'gender=female').iterdir())

[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.parquet' / 'gender=male').iterdir())

1.3.6. ORC

ORC files are created by specifying the ORC file format.

[ ]:
df.write\
    .format('orc')\
    .mode('overwrite')\
    .save(str(OUTPUT_DIR / 'data.orc'))

[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.orc').iterdir())

1.4. Reading

1.4.1. Whole text file

The function wholeTextFiles() will read all the contents of files into a RDD of 2-tuple, where the first element is path of the file and the second element is the entire content of the file. See how we can specify wildcards * to read more than one file?

[ ]:
pair_rdd = sc.wholeTextFiles(str(OUTPUT_DIR / 'data.csv'))

item = pair_rdd.collect()[0]

print(f'item[0] = {item[0]}')
print(f'item[1][0:90] = {item[1][0:90]}')

1.4.2. Text file by lines

If we wanted to read a file line-by-line, then use textFile(). A RDD of strings will be returned.

[ ]:
rdd = sc.textFile(str(OUTPUT_DIR / 'data.csv'))
rdd.take(5)

1.4.3. CSV

We can also read CSV files.

[ ]:
spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load(str(OUTPUT_DIR / 'data.csv'))\
    .show()

1.4.4. JSON

JSON files are read as below.

[ ]:
spark.read.format('json')\
    .option('inferSchema', 'true')\
    .load(str(OUTPUT_DIR / 'data.json'))\
    .show()

1.4.5. Parquet

Parquet files are easy to read.

[ ]:
spark.read.parquet(str(OUTPUT_DIR / 'data.parquet')).show()

1.4.6. ORC

No problems as well reading ORC files.

[ ]:
spark.read.orc(str(OUTPUT_DIR / 'data.orc')).show()