4. Input/Output

It’s very important to read and write data. Let’s see how to read and write data in Spark.

4.1. Data

Here’s some mock data we will create and use to illustrate IO operations.

[1]:
import pandas as pd
from random import randint, choice

def get_record(idx, n_cols):
    gender = choice(['male', 'female'])
    data = [idx, gender] + [randint(1, 100) for _ in range(n_cols)]
    return tuple(data)

n_cols = 10
n_rows = 10

data = [get_record(i, n_cols) for i, r in enumerate(range(n_rows))]
columns = ['id', 'gender'] + [f'x{i}' for i in range(n_cols)]

df = sqlContext.createDataFrame(pd.DataFrame(data, columns=columns))
[2]:
df.show()
+---+------+---+---+---+---+---+---+---+---+---+---+
| id|gender| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|
+---+------+---+---+---+---+---+---+---+---+---+---+
|  0|female| 46| 47|  5| 89| 68| 10| 28| 85|  6| 79|
|  1|  male| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35|
|  2|female|  1| 68|  3| 62| 53| 61| 97| 28| 35| 15|
|  3|female| 55| 87| 51|100| 17| 41|  6| 41| 45| 56|
|  4|female| 12| 73|  7| 58| 97| 34| 86| 68| 86| 14|
|  5|female| 86| 70| 57| 74|  8| 11| 84| 42| 93| 16|
|  6|female| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|
|  7|female| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|
|  8|  male| 85|  4| 50| 71| 72| 40| 88| 55| 30| 52|
|  9|female| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|
+---+------+---+---+---+---+---+---+---+---+---+---+

[3]:
df.printSchema()
root
 |-- id: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- x0: long (nullable = true)
 |-- x1: long (nullable = true)
 |-- x2: long (nullable = true)
 |-- x3: long (nullable = true)
 |-- x4: long (nullable = true)
 |-- x5: long (nullable = true)
 |-- x6: long (nullable = true)
 |-- x7: long (nullable = true)
 |-- x8: long (nullable = true)
 |-- x9: long (nullable = true)

4.2. Writing data

4.2.1. CSV

When we want to write a CSV file, we have to specify the format.

[4]:
df.write\
    .format('com.databricks.spark.csv')\
    .mode('overwrite')\
    .option('header', 'true')\
    .save('/user/root/data.csv')

Note that multiple CSV files are written as data will be written in parallel.

[5]:
%%sh
hdfs dfs -ls /user/root/data.csv | awk '{print $8}'

/user/root/data.csv/_SUCCESS
/user/root/data.csv/part-00000-b435899e-e687-467a-9182-b6f7753db8b0-c000.csv
/user/root/data.csv/part-00001-b435899e-e687-467a-9182-b6f7753db8b0-c000.csv

4.2.2. As one CSV file

If we wanted to write to only 1 file, we need to create one partition.

[6]:
df.repartition(1).write\
    .format('com.databricks.spark.csv')\
    .mode('overwrite')\
    .option('header', 'true')\
    .save('/user/root/data.csv')
[7]:
%%sh
hdfs dfs -ls /user/root/data.csv | awk '{print $8}'

/user/root/data.csv/_SUCCESS
/user/root/data.csv/part-00000-c15d5a89-dcf8-4815-b075-e5a1af1cd2e4-c000.csv

4.2.3. JSON

Writing data as JSON is accomplished by specifying the JSON format.

[8]:
df.write\
    .format('json')\
    .mode('overwrite')\
    .save('/user/root/data.json')
[9]:
%%sh
hdfs dfs -ls /user/root/data.json | awk '{print $8}'

/user/root/data.json/_SUCCESS
/user/root/data.json/part-00000-b7e4462b-99cb-4cba-b430-2738cd8f8f35-c000.json
/user/root/data.json/part-00001-b7e4462b-99cb-4cba-b430-2738cd8f8f35-c000.json

4.2.4. Parquet

Parquet files are also easy to create.

[10]:
df.write\
    .format('parquet')\
    .mode('overwrite')\
    .save('/user/root/data.parquet')
[11]:
%%sh
hdfs dfs -ls /user/root/data.parquet | awk '{print $8}'

/user/root/data.parquet/_SUCCESS
/user/root/data.parquet/part-00000-cd6f0ab1-b720-4e23-9a85-1b11eec25ff3-c000.snappy.parquet
/user/root/data.parquet/part-00001-cd6f0ab1-b720-4e23-9a85-1b11eec25ff3-c000.snappy.parquet

4.2.5. Parquet with partitions

If we want to create parititioned Parquet files, we need to specify which field to parition by.

[12]:
df.write\
    .format('parquet')\
    .mode('overwrite')\
    .partitionBy('gender')\
    .save('/user/root/data.parquet')
[13]:
%%sh
hdfs dfs -ls /user/root/data.parquet | awk '{print $8}'

/user/root/data.parquet/_SUCCESS
/user/root/data.parquet/gender=female
/user/root/data.parquet/gender=male
[14]:
%%sh
hdfs dfs -ls /user/root/data.parquet/gender=female | awk '{print $8}'

/user/root/data.parquet/gender=female/part-00000-4db76505-3e45-41e1-90cb-557bf85c92df.c000.snappy.parquet
/user/root/data.parquet/gender=female/part-00001-4db76505-3e45-41e1-90cb-557bf85c92df.c000.snappy.parquet
[15]:
%%sh
hdfs dfs -ls /user/root/data.parquet/gender=male | awk '{print $8}'

/user/root/data.parquet/gender=male/part-00000-4db76505-3e45-41e1-90cb-557bf85c92df.c000.snappy.parquet
/user/root/data.parquet/gender=male/part-00001-4db76505-3e45-41e1-90cb-557bf85c92df.c000.snappy.parquet

4.2.6. ORC

ORC files are created by specifying the ORC file format.

[16]:
df.write\
    .format('orc')\
    .mode('overwrite')\
    .save('/user/root/data.orc')
[17]:
%%sh
hdfs dfs -ls /user/root/data.orc | awk '{print $8}'

/user/root/data.orc/_SUCCESS
/user/root/data.orc/part-00000-c0736144-12b8-4248-ab6b-9b1e18de48b8-c000.snappy.orc
/user/root/data.orc/part-00001-c0736144-12b8-4248-ab6b-9b1e18de48b8-c000.snappy.orc

4.3. Reading

4.3.1. Whole text file

The function wholeTextFiles() will read all the contents of files into a RDD of 2-tuple, where the first element is path of the file and the second element is the entire content of the file. See how we can specify wildcards * to read more than one file?

[18]:
pair_rdd = sc.wholeTextFiles('hdfs://localhost/*.csv')

item = pair_rdd.collect()[0]

print(f'item[0] = {item[0]}')
print(f'item[1][0:90] = {item[1][0:90]}')
item[0] = hdfs://localhost/data.csv
item[1][0:90] = x0,x1,x2,x3,x4,x5,x6,x7,x8,x9
14,22,25,63,47,52,13,14,23,27
35,80,38,28,73,69,21,16,76,53

4.3.2. Text file by lines

If we wanted to read a file line-by-line, then use textFile(). A RDD of strings will be returned.

[19]:
rdd = sc.textFile('hdfs://localhost/data.csv')
rdd.take(5)
[19]:
['x0,x1,x2,x3,x4,x5,x6,x7,x8,x9',
 '14,22,25,63,47,52,13,14,23,27',
 '35,80,38,28,73,69,21,16,76,53',
 '46,37,46,55,78,68,61,62,81,82',
 '19,12,45,50,71,63,94,7,10,77']

4.3.3. CSV

We can also read CSV files.

[20]:
spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('/user/root/data.csv')\
    .show()
+---+------+---+---+---+---+---+---+---+---+---+---+
| id|gender| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|
+---+------+---+---+---+---+---+---+---+---+---+---+
|  0|female| 46| 47|  5| 89| 68| 10| 28| 85|  6| 79|
|  1|  male| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35|
|  2|female|  1| 68|  3| 62| 53| 61| 97| 28| 35| 15|
|  3|female| 55| 87| 51|100| 17| 41|  6| 41| 45| 56|
|  4|female| 12| 73|  7| 58| 97| 34| 86| 68| 86| 14|
|  5|female| 86| 70| 57| 74|  8| 11| 84| 42| 93| 16|
|  6|female| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|
|  7|female| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|
|  8|  male| 85|  4| 50| 71| 72| 40| 88| 55| 30| 52|
|  9|female| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|
+---+------+---+---+---+---+---+---+---+---+---+---+

4.3.4. JSON

JSON files are read as below.

[21]:
spark.read.format('json')\
    .option('inferSchema', 'true')\
    .load('/user/root/data.json')\
    .show()
+------+---+---+---+---+---+---+---+---+---+---+---+
|gender| id| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|
+------+---+---+---+---+---+---+---+---+---+---+---+
|female|  5| 86| 70| 57| 74|  8| 11| 84| 42| 93| 16|
|female|  6| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|
|female|  7| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|
|  male|  8| 85|  4| 50| 71| 72| 40| 88| 55| 30| 52|
|female|  9| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|
|female|  0| 46| 47|  5| 89| 68| 10| 28| 85|  6| 79|
|  male|  1| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35|
|female|  2|  1| 68|  3| 62| 53| 61| 97| 28| 35| 15|
|female|  3| 55| 87| 51|100| 17| 41|  6| 41| 45| 56|
|female|  4| 12| 73|  7| 58| 97| 34| 86| 68| 86| 14|
+------+---+---+---+---+---+---+---+---+---+---+---+

4.3.5. Parquet

Parquet files are easy to read.

[22]:
spark.read.parquet('/user/root/data.parquet').show()
+---+---+---+---+---+---+---+---+---+---+---+------+
| id| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|gender|
+---+---+---+---+---+---+---+---+---+---+---+------+
|  5| 86| 70| 57| 74|  8| 11| 84| 42| 93| 16|female|
|  6| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|female|
|  7| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|female|
|  9| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|female|
|  0| 46| 47|  5| 89| 68| 10| 28| 85|  6| 79|female|
|  2|  1| 68|  3| 62| 53| 61| 97| 28| 35| 15|female|
|  3| 55| 87| 51|100| 17| 41|  6| 41| 45| 56|female|
|  4| 12| 73|  7| 58| 97| 34| 86| 68| 86| 14|female|
|  1| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35|  male|
|  8| 85|  4| 50| 71| 72| 40| 88| 55| 30| 52|  male|
+---+---+---+---+---+---+---+---+---+---+---+------+

4.3.6. ORC

No problems as well reading ORC files.

[23]:
spark.read.orc('/user/root/data.orc').show()
+---+------+---+---+---+---+---+---+---+---+---+---+
| id|gender| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|
+---+------+---+---+---+---+---+---+---+---+---+---+
|  5|female| 86| 70| 57| 74|  8| 11| 84| 42| 93| 16|
|  6|female| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|
|  7|female| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|
|  8|  male| 85|  4| 50| 71| 72| 40| 88| 55| 30| 52|
|  9|female| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|
|  0|female| 46| 47|  5| 89| 68| 10| 28| 85|  6| 79|
|  1|  male| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35|
|  2|female|  1| 68|  3| 62| 53| 61| 97| 28| 35| 15|
|  3|female| 55| 87| 51|100| 17| 41|  6| 41| 45| 56|
|  4|female| 12| 73|  7| 58| 97| 34| 86| 68| 86| 14|
+---+------+---+---+---+---+---+---+---+---+---+---+