1. Input/Output
It’s very important to read and write data. Let’s see how to read and write data in Spark.
1.1. Local Spark setup
[ ]:
# Local Spark setup for the book examples.
from pathlib import Path
from pyspark.sql import SparkSession
DATA_DIR = Path.cwd()
OUTPUT_DIR = DATA_DIR / "_spark_output" / "io"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
builder = (
SparkSession.builder
.master("local[*]")
.appName("spark-intro-io")
.config("spark.driver.host", "127.0.0.1")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.default.parallelism", "4")
.config("spark.sql.adaptive.enabled", "false")
)
spark = builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sqlContext = spark
1.2. Data
Here’s some mock data we will create and use to illustrate IO operations.
[1]:
import pandas as pd
from random import randint, choice
def get_record(idx, n_cols):
gender = choice(['male', 'female'])
data = [idx, gender] + [randint(1, 100) for _ in range(n_cols)]
return tuple(data)
n_cols = 10
n_rows = 10
data = [get_record(i, n_cols) for i, r in enumerate(range(n_rows))]
columns = ['id', 'gender'] + [f'x{i}' for i in range(n_cols)]
df = sqlContext.createDataFrame(pd.DataFrame(data, columns=columns))
[2]:
df.show()
+---+------+---+---+---+---+---+---+---+---+---+---+
| id|gender| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|
+---+------+---+---+---+---+---+---+---+---+---+---+
| 0|female| 46| 47| 5| 89| 68| 10| 28| 85| 6| 79|
| 1| male| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35|
| 2|female| 1| 68| 3| 62| 53| 61| 97| 28| 35| 15|
| 3|female| 55| 87| 51|100| 17| 41| 6| 41| 45| 56|
| 4|female| 12| 73| 7| 58| 97| 34| 86| 68| 86| 14|
| 5|female| 86| 70| 57| 74| 8| 11| 84| 42| 93| 16|
| 6|female| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|
| 7|female| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|
| 8| male| 85| 4| 50| 71| 72| 40| 88| 55| 30| 52|
| 9|female| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|
+---+------+---+---+---+---+---+---+---+---+---+---+
[3]:
df.printSchema()
root
|-- id: long (nullable = true)
|-- gender: string (nullable = true)
|-- x0: long (nullable = true)
|-- x1: long (nullable = true)
|-- x2: long (nullable = true)
|-- x3: long (nullable = true)
|-- x4: long (nullable = true)
|-- x5: long (nullable = true)
|-- x6: long (nullable = true)
|-- x7: long (nullable = true)
|-- x8: long (nullable = true)
|-- x9: long (nullable = true)
1.3. Writing data
1.3.1. CSV
When we want to write a CSV file, we have to specify the format.
[ ]:
df.write\
.format('csv')\
.mode('overwrite')\
.option('header', 'true')\
.save(str(OUTPUT_DIR / 'data.csv'))
Note that multiple CSV files are written as data will be written in parallel.
[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.csv').iterdir())
1.3.2. As one CSV file
If we wanted to write to only 1 file, we need to create one partition.
[ ]:
df.repartition(1).write\
.format('csv')\
.mode('overwrite')\
.option('header', 'true')\
.save(str(OUTPUT_DIR / 'data.csv'))
[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.csv').iterdir())
1.3.3. JSON
Writing data as JSON is accomplished by specifying the JSON format.
[ ]:
df.write\
.format('json')\
.mode('overwrite')\
.save(str(OUTPUT_DIR / 'data.json'))
[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.json').iterdir())
1.3.4. Parquet
Parquet files are also easy to create.
[ ]:
df.write\
.format('parquet')\
.mode('overwrite')\
.save(str(OUTPUT_DIR / 'data.parquet'))
[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.parquet').iterdir())
1.3.5. Parquet with partitions
If we want to create partitioned Parquet files, we need to specify which field to partition by.
[ ]:
df.write\
.format('parquet')\
.mode('overwrite')\
.partitionBy('gender')\
.save(str(OUTPUT_DIR / 'data.parquet'))
[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.parquet').iterdir())
[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.parquet' / 'gender=female').iterdir())
[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.parquet' / 'gender=male').iterdir())
1.3.6. ORC
ORC files are created by specifying the ORC file format.
[ ]:
df.write\
.format('orc')\
.mode('overwrite')\
.save(str(OUTPUT_DIR / 'data.orc'))
[ ]:
sorted(p.name for p in (OUTPUT_DIR / 'data.orc').iterdir())
1.4. Reading
1.4.1. Whole text file
The function wholeTextFiles() will read all the contents of files into a RDD of 2-tuple, where the first element is path of the file and the second element is the entire content of the file. See how we can specify wildcards * to read more than one file?
[ ]:
pair_rdd = sc.wholeTextFiles(str(OUTPUT_DIR / 'data.csv'))
item = pair_rdd.collect()[0]
print(f'item[0] = {item[0]}')
print(f'item[1][0:90] = {item[1][0:90]}')
1.4.2. Text file by lines
If we wanted to read a file line-by-line, then use textFile(). A RDD of strings will be returned.
[ ]:
rdd = sc.textFile(str(OUTPUT_DIR / 'data.csv'))
rdd.take(5)
1.4.3. CSV
We can also read CSV files.
[ ]:
spark.read.format('csv')\
.option('header', 'true')\
.option('inferSchema', 'true')\
.load(str(OUTPUT_DIR / 'data.csv'))\
.show()
1.4.4. JSON
JSON files are read as below.
[ ]:
spark.read.format('json')\
.option('inferSchema', 'true')\
.load(str(OUTPUT_DIR / 'data.json'))\
.show()
1.4.5. Parquet
Parquet files are easy to read.
[ ]:
spark.read.parquet(str(OUTPUT_DIR / 'data.parquet')).show()
1.4.6. ORC
No problems as well reading ORC files.
[ ]:
spark.read.orc(str(OUTPUT_DIR / 'data.orc')).show()