4. Input/Output
It’s very important to read and write data. Let’s see how to read and write data in Spark.
4.1. Data
Here’s some mock data we will create and use to illustrate IO operations.
[1]:
import pandas as pd
from random import randint, choice
def get_record(idx, n_cols):
gender = choice(['male', 'female'])
data = [idx, gender] + [randint(1, 100) for _ in range(n_cols)]
return tuple(data)
n_cols = 10
n_rows = 10
data = [get_record(i, n_cols) for i, r in enumerate(range(n_rows))]
columns = ['id', 'gender'] + [f'x{i}' for i in range(n_cols)]
df = sqlContext.createDataFrame(pd.DataFrame(data, columns=columns))
[2]:
df.show()
+---+------+---+---+---+---+---+---+---+---+---+---+
| id|gender| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|
+---+------+---+---+---+---+---+---+---+---+---+---+
| 0|female| 46| 47| 5| 89| 68| 10| 28| 85| 6| 79|
| 1| male| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35|
| 2|female| 1| 68| 3| 62| 53| 61| 97| 28| 35| 15|
| 3|female| 55| 87| 51|100| 17| 41| 6| 41| 45| 56|
| 4|female| 12| 73| 7| 58| 97| 34| 86| 68| 86| 14|
| 5|female| 86| 70| 57| 74| 8| 11| 84| 42| 93| 16|
| 6|female| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|
| 7|female| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|
| 8| male| 85| 4| 50| 71| 72| 40| 88| 55| 30| 52|
| 9|female| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|
+---+------+---+---+---+---+---+---+---+---+---+---+
[3]:
df.printSchema()
root
|-- id: long (nullable = true)
|-- gender: string (nullable = true)
|-- x0: long (nullable = true)
|-- x1: long (nullable = true)
|-- x2: long (nullable = true)
|-- x3: long (nullable = true)
|-- x4: long (nullable = true)
|-- x5: long (nullable = true)
|-- x6: long (nullable = true)
|-- x7: long (nullable = true)
|-- x8: long (nullable = true)
|-- x9: long (nullable = true)
4.2. Writing data
4.2.1. CSV
When we want to write a CSV file, we have to specify the format.
[4]:
df.write\
.format('com.databricks.spark.csv')\
.mode('overwrite')\
.option('header', 'true')\
.save('/user/root/data.csv')
Note that multiple CSV files are written as data will be written in parallel.
[5]:
%%sh
hdfs dfs -ls /user/root/data.csv | awk '{print $8}'
/user/root/data.csv/_SUCCESS
/user/root/data.csv/part-00000-b435899e-e687-467a-9182-b6f7753db8b0-c000.csv
/user/root/data.csv/part-00001-b435899e-e687-467a-9182-b6f7753db8b0-c000.csv
4.2.2. As one CSV file
If we wanted to write to only 1 file, we need to create one partition.
[6]:
df.repartition(1).write\
.format('com.databricks.spark.csv')\
.mode('overwrite')\
.option('header', 'true')\
.save('/user/root/data.csv')
[7]:
%%sh
hdfs dfs -ls /user/root/data.csv | awk '{print $8}'
/user/root/data.csv/_SUCCESS
/user/root/data.csv/part-00000-c15d5a89-dcf8-4815-b075-e5a1af1cd2e4-c000.csv
4.2.3. JSON
Writing data as JSON is accomplished by specifying the JSON format.
[8]:
df.write\
.format('json')\
.mode('overwrite')\
.save('/user/root/data.json')
[9]:
%%sh
hdfs dfs -ls /user/root/data.json | awk '{print $8}'
/user/root/data.json/_SUCCESS
/user/root/data.json/part-00000-b7e4462b-99cb-4cba-b430-2738cd8f8f35-c000.json
/user/root/data.json/part-00001-b7e4462b-99cb-4cba-b430-2738cd8f8f35-c000.json
4.2.4. Parquet
Parquet files are also easy to create.
[10]:
df.write\
.format('parquet')\
.mode('overwrite')\
.save('/user/root/data.parquet')
[11]:
%%sh
hdfs dfs -ls /user/root/data.parquet | awk '{print $8}'
/user/root/data.parquet/_SUCCESS
/user/root/data.parquet/part-00000-cd6f0ab1-b720-4e23-9a85-1b11eec25ff3-c000.snappy.parquet
/user/root/data.parquet/part-00001-cd6f0ab1-b720-4e23-9a85-1b11eec25ff3-c000.snappy.parquet
4.2.5. Parquet with partitions
If we want to create parititioned Parquet files, we need to specify which field to parition by.
[12]:
df.write\
.format('parquet')\
.mode('overwrite')\
.partitionBy('gender')\
.save('/user/root/data.parquet')
[13]:
%%sh
hdfs dfs -ls /user/root/data.parquet | awk '{print $8}'
/user/root/data.parquet/_SUCCESS
/user/root/data.parquet/gender=female
/user/root/data.parquet/gender=male
[14]:
%%sh
hdfs dfs -ls /user/root/data.parquet/gender=female | awk '{print $8}'
/user/root/data.parquet/gender=female/part-00000-4db76505-3e45-41e1-90cb-557bf85c92df.c000.snappy.parquet
/user/root/data.parquet/gender=female/part-00001-4db76505-3e45-41e1-90cb-557bf85c92df.c000.snappy.parquet
[15]:
%%sh
hdfs dfs -ls /user/root/data.parquet/gender=male | awk '{print $8}'
/user/root/data.parquet/gender=male/part-00000-4db76505-3e45-41e1-90cb-557bf85c92df.c000.snappy.parquet
/user/root/data.parquet/gender=male/part-00001-4db76505-3e45-41e1-90cb-557bf85c92df.c000.snappy.parquet
4.2.6. ORC
ORC files are created by specifying the ORC file format.
[16]:
df.write\
.format('orc')\
.mode('overwrite')\
.save('/user/root/data.orc')
[17]:
%%sh
hdfs dfs -ls /user/root/data.orc | awk '{print $8}'
/user/root/data.orc/_SUCCESS
/user/root/data.orc/part-00000-c0736144-12b8-4248-ab6b-9b1e18de48b8-c000.snappy.orc
/user/root/data.orc/part-00001-c0736144-12b8-4248-ab6b-9b1e18de48b8-c000.snappy.orc
4.3. Reading
4.3.1. Whole text file
The function wholeTextFiles()
will read all the contents of files into a RDD of 2-tuple, where the first element is path of the file and the second element is the entire content of the file. See how we can specify wildcards *
to read more than one file?
[18]:
pair_rdd = sc.wholeTextFiles('hdfs://localhost/*.csv')
item = pair_rdd.collect()[0]
print(f'item[0] = {item[0]}')
print(f'item[1][0:90] = {item[1][0:90]}')
item[0] = hdfs://localhost/data.csv
item[1][0:90] = x0,x1,x2,x3,x4,x5,x6,x7,x8,x9
14,22,25,63,47,52,13,14,23,27
35,80,38,28,73,69,21,16,76,53
4.3.2. Text file by lines
If we wanted to read a file line-by-line, then use textFile()
. A RDD of strings will be returned.
[19]:
rdd = sc.textFile('hdfs://localhost/data.csv')
rdd.take(5)
[19]:
['x0,x1,x2,x3,x4,x5,x6,x7,x8,x9',
'14,22,25,63,47,52,13,14,23,27',
'35,80,38,28,73,69,21,16,76,53',
'46,37,46,55,78,68,61,62,81,82',
'19,12,45,50,71,63,94,7,10,77']
4.3.3. CSV
We can also read CSV files.
[20]:
spark.read.format('csv')\
.option('header', 'true')\
.option('inferSchema', 'true')\
.load('/user/root/data.csv')\
.show()
+---+------+---+---+---+---+---+---+---+---+---+---+
| id|gender| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|
+---+------+---+---+---+---+---+---+---+---+---+---+
| 0|female| 46| 47| 5| 89| 68| 10| 28| 85| 6| 79|
| 1| male| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35|
| 2|female| 1| 68| 3| 62| 53| 61| 97| 28| 35| 15|
| 3|female| 55| 87| 51|100| 17| 41| 6| 41| 45| 56|
| 4|female| 12| 73| 7| 58| 97| 34| 86| 68| 86| 14|
| 5|female| 86| 70| 57| 74| 8| 11| 84| 42| 93| 16|
| 6|female| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|
| 7|female| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|
| 8| male| 85| 4| 50| 71| 72| 40| 88| 55| 30| 52|
| 9|female| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|
+---+------+---+---+---+---+---+---+---+---+---+---+
4.3.4. JSON
JSON files are read as below.
[21]:
spark.read.format('json')\
.option('inferSchema', 'true')\
.load('/user/root/data.json')\
.show()
+------+---+---+---+---+---+---+---+---+---+---+---+
|gender| id| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|
+------+---+---+---+---+---+---+---+---+---+---+---+
|female| 5| 86| 70| 57| 74| 8| 11| 84| 42| 93| 16|
|female| 6| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|
|female| 7| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|
| male| 8| 85| 4| 50| 71| 72| 40| 88| 55| 30| 52|
|female| 9| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|
|female| 0| 46| 47| 5| 89| 68| 10| 28| 85| 6| 79|
| male| 1| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35|
|female| 2| 1| 68| 3| 62| 53| 61| 97| 28| 35| 15|
|female| 3| 55| 87| 51|100| 17| 41| 6| 41| 45| 56|
|female| 4| 12| 73| 7| 58| 97| 34| 86| 68| 86| 14|
+------+---+---+---+---+---+---+---+---+---+---+---+
4.3.5. Parquet
Parquet files are easy to read.
[22]:
spark.read.parquet('/user/root/data.parquet').show()
+---+---+---+---+---+---+---+---+---+---+---+------+
| id| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|gender|
+---+---+---+---+---+---+---+---+---+---+---+------+
| 5| 86| 70| 57| 74| 8| 11| 84| 42| 93| 16|female|
| 6| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|female|
| 7| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|female|
| 9| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|female|
| 0| 46| 47| 5| 89| 68| 10| 28| 85| 6| 79|female|
| 2| 1| 68| 3| 62| 53| 61| 97| 28| 35| 15|female|
| 3| 55| 87| 51|100| 17| 41| 6| 41| 45| 56|female|
| 4| 12| 73| 7| 58| 97| 34| 86| 68| 86| 14|female|
| 1| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35| male|
| 8| 85| 4| 50| 71| 72| 40| 88| 55| 30| 52| male|
+---+---+---+---+---+---+---+---+---+---+---+------+
4.3.6. ORC
No problems as well reading ORC files.
[23]:
spark.read.orc('/user/root/data.orc').show()
+---+------+---+---+---+---+---+---+---+---+---+---+
| id|gender| x0| x1| x2| x3| x4| x5| x6| x7| x8| x9|
+---+------+---+---+---+---+---+---+---+---+---+---+
| 5|female| 86| 70| 57| 74| 8| 11| 84| 42| 93| 16|
| 6|female| 64| 60| 50| 28| 27| 69| 42| 58| 60| 11|
| 7|female| 36| 97| 58| 92| 26| 74| 76| 17| 40| 42|
| 8| male| 85| 4| 50| 71| 72| 40| 88| 55| 30| 52|
| 9|female| 16| 94| 42| 27| 78| 94| 37| 91| 87| 53|
| 0|female| 46| 47| 5| 89| 68| 10| 28| 85| 6| 79|
| 1| male| 61| 48| 88| 27| 86| 45| 29| 11| 20| 35|
| 2|female| 1| 68| 3| 62| 53| 61| 97| 28| 35| 15|
| 3|female| 55| 87| 51|100| 17| 41| 6| 41| 45| 56|
| 4|female| 12| 73| 7| 58| 97| 34| 86| 68| 86| 14|
+---+------+---+---+---+---+---+---+---+---+---+---+