1. Resilient Distributed Datasets (RDD)
Think of a RDD as a distributed dataset. From a Pythonic point of view, imagine a list of integers. This list of integers is one dataset and sits on one computer.
data = [18, 19, 21, 17]
Now, imagine we could somehow split this dataset into two parts and place them on different computers.
data_part_1 = [18, 19] # goes to computer 1
data_part_2 = [21, 17] # goes to computer 2
At its most basic level, an RDD is conceptually a collection of elements that is spread around different computers.
1.1. Acquiring a RDD
How do we create a RDD or where does an RDD come from? RDDs may be created programmatically or from reading files.
1.1.1. Creating a RDD
The easiest way to programmatically create an RDD is to use the parallelize()
method from the spark context sc
. Note that we pass in a list of numbers; the list of numbers is generated using a list comprehension
.
[1]:
num_rdd = sc.parallelize([i for i in range(10)])
type(num_rdd)
[1]:
pyspark.rdd.RDD
1.1.2. Create a pair RDD
Just think of a pair RDD
as a distributed dataset whose records are key-value pairs. From a Pythonic point of view, think about a list of tuples. This list of tuples is one dataset and sits on one computer. For each tuple in this list, * the first element is a name and plays the role of the key
, and * the second element is an age and plays the role of the value
.
data = [('john', 18), ('jack', 19), ('jill', 21), ('jenn', 17)]
Now, imagine we could somehow split this dataset into two parts and place them on different computers.
data_part_1 = [('john', 18), ('jack', 19)] # goes to computer 1
data_part_2 = [('jill', 21), ('jenn', 17)] # goes to computer 2
At its most basic level, a pair RDD is conceptually a collection of 2-tuples that is spread around different computers. Below, we create a pair RDD where the key is a number and the value is the key multiplied by itself.
[2]:
pair_rdd = sc.parallelize([(i, i*i) for i in range(10)])
type(num_rdd)
[2]:
pyspark.rdd.RDD
1.1.3. Read a RDD from HDFS
If we store a CSV
file in HDFS
(Hadoop Distributed File System), we can read the contents into a RDD via sc.textFile()
.
[3]:
data_rdd = sc.textFile('hdfs://localhost/data.csv')
type(data_rdd)
[3]:
pyspark.rdd.RDD
1.2. Transformations
After we acquire a RDD, we can do two broad categories of operations.
Transformation: an operation to change the data
Action: an operation to collect the data
Transformation operations are lazily
evaluated. Just because you have applied a transformation to a RDD does not mean anything will happen. Only when you execute an action against the RDD does computation actually start. Let’s look at some types of transformations that we may perform against RDDs.
1.2.1. Map
The map()
function transforms each element into something else. Below, we transform the original number into a new ones by
multiplying that number by itself,
adding one to that number,
subtracting one from that number, and
dividing that number by ten.
[4]:
num_rdd.map(lambda x: x * x).collect()
[4]:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[5]:
num_rdd.map(lambda x: x + 1).collect()
[5]:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[6]:
num_rdd.map(lambda x: x - 1).collect()
[6]:
[-1, 0, 1, 2, 3, 4, 5, 6, 7, 8]
[7]:
num_rdd.map(lambda x: x / 10).collect()
[7]:
[0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
1.2.2. Filter
The filter()
method removes elements from a RDD. The filter method must supply a function that returns True
(to keep) or False
(to remove) each element. Below, we filter even and odd elements out of the data.
[8]:
num_rdd.filter(lambda x: x % 2 == 0).collect()
[8]:
[0, 2, 4, 6, 8]
[9]:
num_rdd.filter(lambda x: x % 2 != 0).collect()
[9]:
[1, 3, 5, 7, 9]
1.2.3. Flat map
The flatMap()
function flattens lists of lists into a list of elements. Let’s say we have the following list of list.
[10]:
data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
What we want to do is to flatten this list data
so that the resulting list is as follows.
[1, 2, 3, 4, 5, 6, 7, 8, 9]
How do we flatten a list of lists in Python? In Python we can use the chain()
method from the itertools
module.
[11]:
from itertools import chain
list(chain(*data))
[11]:
[1, 2, 3, 4, 5, 6, 7, 8, 9]
When using PySpark, the flatMap()
function does the flattening for us.
[12]:
data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
sc.parallelize(data).flatMap(lambda x: x).collect()
[12]:
[1, 2, 3, 4, 5, 6, 7, 8, 9]
1.2.4. Sample
If we want to take samples from a RDD, we can use the sample()
method. The arguments to sample()
are as follows.
withReplacement
will indicate if we want to sample with replacement (records can be selected multiple times)fraction
specifies the percentage of the data we want to bring backseed
will be the seed used to control for randomization during sampling
[13]:
num_rdd.sample(withReplacement=False, fraction=0.2, seed=37).collect()
[13]:
[0, 3, 7]
1.2.5. Union
If we have two RDDs, we can bring them together through union()
.
[14]:
num_rdd.union(num_rdd).collect()
[14]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1.2.6. Intersection
Between two RDDs, if we want only the elements they share in common, we can apply the intersection()
function.
[15]:
a = sc.parallelize([1, 2, 3])
b = sc.parallelize([3, 4, 5])
a.intersection(b).collect()
[15]:
[3]
1.2.7. Distinct
The distinct()
function will bring back only unique elements.
[16]:
a = sc.parallelize([1, 2, 2, 3, 4])
a.distinct().collect()
[16]:
[2, 4, 3, 1]
1.2.8. Group by key
If we have a pair RDD, we can group data by the key using groupByKey()
. After we apply groupByKey()
a new pair RDD is created where * the key is the key as before and * the value is an iterable
.
Below, we convert the iterable
to a list using the list()
function. The groupByKey()
is an expensive operation as it causes data shuffling. In the Spark framework, we work extra hard to keep data from moving (as there is a lot of data and we do not desire to congest the network with such movement of huge data); the only thing we desire to move is the compute code. Try to avoid groupByKey()
when you can.
[17]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])
for key, it in a.groupByKey().collect():
print(key, list(it))
2 [4, 3]
1 [1, 2]
1.2.9. Reduce by key
If we wanted to collapse all the values associated with a key in a pair RDD, we need to use the reduceByKey()
function. The reduceByKey()
function is much more efficient than groupByKey()
. We should work extra hard to modify logic that works for groupByKey()
to work for and use reduceByKey()
.
Below, we simply sum over all the values associated with a key.
[18]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])
a.reduceByKey(lambda a, b: a + b).collect()
[18]:
[(2, 7), (1, 3)]
Here is a sort of an anti-pattern
using groupByKey()
to add the elements associated with each key. We get the same result as with reduceByKey()
, but with potentially extra overhead (data shuffling).
[19]:
add_elements = lambda tup: (tup[0], sum(list(tup[1])))
sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])\
.groupByKey()\
.map(add_elements)\
.collect()
[19]:
[(2, 7), (1, 3)]
Here’s an interesting dataset. It’s a list of 2-tuples, where the first element is the key or unique identifier of a person, and the second element is a piece of information (stored in a map) about the person. How do we use reduceByKey()
to merge all the information according to the unique identifier? If you look below, you will notice that we merge the dictionaries using the dictionary unpacking operator **
.
[20]:
data = [
(1, {'name': 'john'}),
(2, {'name': 'jack'}),
(1, {'age': 23}),
(2, {'age': 24}),
]
sc.parallelize(data).reduceByKey(lambda a, b: {**a, **b}).collect()
[20]:
[(2, {'name': 'jack', 'age': 24}), (1, {'age': 23, 'name': 'john'})]
1.2.10. Aggregate by key
In a pair RDD, we can specify how to aggregate values by keys within and between partitions. There are three arguments required.
an initial value
a combining function to aggregate within a partition
a merging function to aggregate between partitions
Below, are some examples of how to aggregate by key.
[21]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])
a.aggregateByKey('value', lambda s, d: f'{s} {d}', lambda s1, s2: f'{s1}, {s2}').collect()
[21]:
[(2, 'value 4 3'), (1, 'value 1 2')]
[22]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])
a.aggregateByKey(0, lambda s, d: s + d, lambda s1, s2: s1 + s2).collect()
[22]:
[(2, 7), (1, 3)]
1.2.11. Sort by key
We can also sort records by key in a pair RDD using sortByKey()
.
[23]:
a = sc.parallelize([(1, 1), (3, 2), (5, 4), (4, 3)])
a.sortByKey().collect()
[23]:
[(1, 1), (3, 2), (4, 3), (5, 4)]
1.2.12. Join
If we have two pair RDDs, we can perform a join based on the keys using join()
.
[24]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (3, 4)])
a.join(b).collect()
[24]:
[(1, (1, 2)), (2, (2, 3)), (3, (3, 4))]
Note that join()
is like a SQL inner join; only records with keys in both RDDs will be returned.
[25]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3), (5, 5)])
b = sc.parallelize([(1, 2), (2, 3), (3, 4), (6, 6)])
a.join(b).collect()
[25]:
[(1, (1, 2)), (2, (2, 3)), (3, (3, 4))]
1.2.13. Left outer join
The leftOuterJoin()
will join two pair RDDs like a SQL left-outer join. All records on the left will be returned even if there is not a corresponding matching record on the right.
[26]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (4, 5)])
a.leftOuterJoin(b).collect()
[26]:
[(1, (1, 2)), (2, (2, 3)), (3, (3, None))]
1.2.14. Right outer join
The rightOuterJoin()
will join two pair RDDs like a SQL right-outer join. All records on the right will be returned even if there is not a corresponding matching record on the left.
[27]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (4, 5)])
a.rightOuterJoin(b).collect()
[27]:
[(4, (None, 5)), (1, (1, 2)), (2, (2, 3))]
1.2.15. Full outer join
The fullOuterJoin()
will join two pair RDDs like a SQL full-outer join. All records on the left and right will be returned.
[28]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (4, 5)])
a.fullOuterJoin(b).collect()
[28]:
[(4, (None, 5)), (1, (1, 2)), (2, (2, 3)), (3, (3, None))]
1.2.16. Cogroup
The cogroup()
function will bring the values from two pair RDDs together.
[29]:
a = sc.parallelize([(1, 1), (1, 2), (2, 3), (2, 4), (3, 5), (3, 6)])
b = sc.parallelize([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (3, 'e'), (3, 'f')])
for key, (it1, it2) in a.cogroup(b).collect():
print(key, list(it1), list(it2))
1 [1, 2] ['a', 'b']
2 [3, 4] ['d', 'c']
3 [5, 6] ['e', 'f']
1.2.17. Cartesian
In Python, if we had two list as follows, and we wanted the cartesian product of those two lists, we use product
from the itertools
module.
[30]:
from itertools import product
a = [1, 2, 3, 4]
b = ['a', 'b', 'c', 'd']
list(product(*[a, b]))
[30]:
[(1, 'a'),
(1, 'b'),
(1, 'c'),
(1, 'd'),
(2, 'a'),
(2, 'b'),
(2, 'c'),
(2, 'd'),
(3, 'a'),
(3, 'b'),
(3, 'c'),
(3, 'd'),
(4, 'a'),
(4, 'b'),
(4, 'c'),
(4, 'd')]
We can achieve the same using the cartesian()
function on two RDDs.
[31]:
a = sc.parallelize([1, 2, 3, 4])
b = sc.parallelize(['a', 'b', 'c', 'd'])
a.cartesian(b).collect()
[31]:
[(1, 'a'),
(1, 'b'),
(2, 'a'),
(2, 'b'),
(1, 'c'),
(1, 'd'),
(2, 'c'),
(2, 'd'),
(3, 'a'),
(3, 'b'),
(4, 'a'),
(4, 'b'),
(3, 'c'),
(3, 'd'),
(4, 'c'),
(4, 'd')]
1.2.18. Repartition
We can force our distributed dataset (the RDD) into a specified number of partitions using repartition()
.
[32]:
a = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])
print(a.getNumPartitions())
a = a.repartition(4)
print(a.getNumPartitions())
2
4
If we want to see the elements in each partition, call the glom()
function on the RDD. You can see that the data is skewed across the 4 requested partitions (some partitions are empty and others are not). In general, this result is not the case when calling repartition()
.
[33]:
a.glom().collect()
[33]:
[[], [5, 6, 7, 8], [], [1, 2, 3, 4]]
Rather, in general, calling repartiton()
will generate the specified number of partitions with roughly equal sized number of elements. We can troubleshoot the result of repartition()
as follows.
[34]:
import pandas as pd
import numpy as np
def get_stats(n_partitions, elements_per_partition):
min_elements = min(elements_per_partition)
max_elements = max(elements_per_partition)
avg_elements = np.mean(elements_per_partition)
std_elements = np.std(elements_per_partition)
return {
'n_partitions': n_partitions,
'min_elements': min_elements,
'max_elements': max_elements,
'avg_elements': avg_elements,
'std_elements': std_elements,
'elements_per_partition': elements_per_partition
}
a = sc.parallelize([i for i in range(10000)])
pd.DataFrame([get_stats(n_partitions, a.repartition(n_partitions).glom().map(len).collect()) for n_partitions in range(2, 10)])
[34]:
n_partitions | min_elements | max_elements | avg_elements | std_elements | elements_per_partition | |
---|---|---|---|---|---|---|
0 | 2 | 5000 | 5000 | 5000.000000 | 0.000000 | [5000, 5000] |
1 | 3 | 3330 | 3340 | 3333.333333 | 4.714045 | [3330, 3330, 3340] |
2 | 4 | 2500 | 2500 | 2500.000000 | 0.000000 | [2500, 2500, 2500, 2500] |
3 | 5 | 2000 | 2000 | 2000.000000 | 0.000000 | [2000, 2000, 2000, 2000, 2000] |
4 | 6 | 1660 | 1670 | 1666.666667 | 4.714045 | [1670, 1670, 1670, 1660, 1660, 1670] |
5 | 7 | 1420 | 1440 | 1428.571429 | 6.388766 | [1420, 1420, 1430, 1430, 1430, 1430, 1440] |
6 | 8 | 1250 | 1250 | 1250.000000 | 0.000000 | [1250, 1250, 1250, 1250, 1250, 1250, 1250, 1250] |
7 | 9 | 1100 | 1120 | 1111.111111 | 5.665577 | [1120, 1110, 1110, 1110, 1110, 1110, 1100, 111... |
1.2.19. Coalesce
We can also force our RDD into a specified number of partitions using coalesce()
.
[35]:
a = sc.parallelize(['hello', 'world'])
print(a.getNumPartitions())
a = a.coalesce(2)
print(a.getNumPartitions())
2
2
So, repartition()
and coalesce()
seem to do the same thing: forcing the data into a specified number of partitions. What’s the difference?
repartition()
incurs a cost of a shuffling and creating new partitions, however, the resulting partitions are roughly equal in size.coalesce()
minimizes shuffling of data and reuses existing partitions, however, the result partitions will most likely not be roughly equal in size.
Which should I use? It depends on your goals and/or preferences. If you want computation to be evenly distributed, go for repartition()
, otherwise, save time by not shuffling data and use coalesce()
.
1.2.20. Pipe
The pipe()
function enables you to specify an external script or program to transform the data. The script or program must be able to receive the data as input and should return an output. The script must be accessible on all the compute nodes; a common mistake is that the script only exists on the driver node and your piping fails.
In the code below, by default, the RDD has 12 partitions, that is why we see 12 outputs of One-Off Coder
. Obviously, or not, 10 of the partitions have no data and only 2 of them do (one for hello and one for world). The script is just a simple echo and looks like the following.
#!/bin/sh
echo 'One-Off Coder'
while read LINE; do
echo ${LINE}
done
[36]:
a = sc.parallelize(['hello', 'world'])
a.pipe('/root/ipynb/echo.sh').collect()
[36]:
['One-Off Coder', 'hello', 'One-Off Coder', 'world']
If we force the number of partitions to 2, then we get more sensible output.
[37]:
a = sc.parallelize(['hello', 'world']).repartition(2)
a.pipe('/root/ipynb/echo.sh').collect()
[37]:
['One-Off Coder', 'hello', 'One-Off Coder', 'world']
If we force the number of partitions to 1, then all the data will be fed to one instance of the script (that’s why we see One-Off Coder
once only).
[38]:
a = sc.parallelize(['hello', 'world']).repartition(1)
a.pipe('/root/ipynb/echo.sh').collect()
[38]:
['One-Off Coder', 'world', 'hello']
1.2.21. Repartition and sort within partitions
For a pair RDD, we can control how many partitions we want and which records go into which partitions with repartitionAndSortWithinPartitions()
. What we get for free is sorting within each partition. The arguments for repartitionAndSortWithinPartitions()
are as follows.
numPartitions
specifies the number of desired partitionspartitionFunc
specifies how to assign records to partitionsascending
specifies if we want to sort ascendinglykeyfunc
specifies how to retrieve the key
[39]:
sc.parallelize([(1, 5), (2, 15), (1, 4), (2, 14), (1, 3), (2, 13)])\
.map(lambda tup: (tup, tup[1]))\
.repartitionAndSortWithinPartitions(
numPartitions=2,
partitionFunc=lambda tup: tup[0] % 2)\
.map(lambda tup: tup[0])\
.collect()
[39]:
[(2, 13), (2, 14), (2, 15), (1, 3), (1, 4), (1, 5)]
1.3. Actions
Remember, transformations on RDDs create other RDDs and are lazily evaluated (no computational cost is incurred). On the other hand, when an action is applied to a RDD, a non-RDD is the result and the data is typically returned to the driver node (or the user from the worker nodes).
1.3.1. Reduce
The reduce()
function collapses all the elements into one.
[40]:
a = sc.parallelize([1, 2, 3])
a.reduce(lambda a, b: a + b)
[40]:
6
Reducing data does not have to be math operations like adding. Below, we merge the dictionaries into one.
[41]:
a = sc.parallelize([{'fname': 'john'}, {'lname': 'doe'}, {'age': 32}])
a.reduce(lambda a, b: {**a, **b})
[41]:
{'fname': 'john', 'lname': 'doe', 'age': 32}
We can also reduce data by selecting on the smallest value.
[42]:
from random import randint
a = sc.parallelize([randint(10, 1000) for _ in range(100)])
a.reduce(lambda a, b: min(a, b))
[42]:
12
1.3.2. Collect
The collect()
function is an action that we have been using all along. This function simply brings back the distributed data into one list on the driver. Be careful, though, as if the data is huge, this operation may fail.
[43]:
a = sc.parallelize([1, 2, 3])
a.collect()
[43]:
[1, 2, 3]
1.3.3. Count
The count()
function counts the number of elements in a RDD.
[44]:
a = sc.parallelize([1, 2, 3])
a.count()
[44]:
3
Below, we generate 1,000 random numbers in the range \([1, 10]\). We then perform a map()
operation creating a list of \(x\) length for each \(x\), followed by a flatMap()
and then count()
.
[45]:
from random import randint
a = sc.parallelize([randint(1, 10) for _ in range(1000)])
a.map(lambda x: [x for _ in range(x)]).flatMap(lambda x: x).count()
[45]:
5521
1.3.4. First
The function first()
always returns the first record back from a RDD.
[46]:
a = sc.parallelize([1, 2, 3])
a.first()
[46]:
1
1.3.5. Take
We can bring back the first \(n\) records using take()
.
[47]:
a = sc.parallelize([1, 2, 3])
a.take(2)
[47]:
[1, 2]
1.3.6. Take sample
We can bring back random records using takeSample()
.
[48]:
a = sc.parallelize([i for i in range(100)])
a.takeSample(withReplacement=False, num=10, seed=37)
[48]:
[62, 46, 36, 88, 25, 22, 51, 16, 0, 52]
1.3.7. Take ordered
We can bring back the first \(n\) records in order using takeOrdered()
.
[49]:
from random import randint
a = sc.parallelize([randint(1, 10000) for _ in range(1000)])
a.takeOrdered(10)
[49]:
[3, 6, 15, 46, 46, 62, 76, 79, 81, 84]
1.3.8. Count by key
Counting the number of records associated with a key is accomplished through countByKey()
.
[50]:
a = sc.parallelize([(randint(1, 10), 1) for _ in range(10000)])
a.countByKey()
[50]:
defaultdict(int,
{5: 986,
3: 1008,
1: 995,
9: 1021,
2: 983,
4: 982,
6: 1035,
8: 996,
10: 999,
7: 995})
1.4. Chaining transformations and actions
The power of transformations and actions emerges from chaining them together.
1.4.1. Map, filter, reduce
The three basic functions introduced when we start to adopt functional programming
are map()
, filter()
and reduce()
. Below, we map each number \(x\) to \(x \times x\), filter for only even numbers, and then add the results.
[51]:
num_rdd = sc.parallelize([i for i in range(10)])
num_rdd\
.map(lambda x: x * x)\
.filter(lambda x: x % 2 == 0)\
.reduce(lambda a, b: a + b)
[51]:
120
1.4.2. Filter, map, take
Here’s an example of parsing out a CSV
file. Note that we have to filter out the row starting with x
since that indicates the header (for this CSV file). We then split (or tokenize) the line specifying the delimiter as a comma ,
. We finally convert all the tokens, which are strings, to integers. We take the first 10 records (rows) to see if we parsed the CSV file correctly.
[52]:
data_rdd = sc.textFile('hdfs://localhost/data.csv')
data_rdd\
.filter(lambda s: False if s.startswith('x') else True)\
.map(lambda s: s.split(','))\
.map(lambda arr: [int(s) for s in arr])\
.take(10)
[52]:
[[41, 82, 16, 51, 11, 85, 46, 50, 1, 55],
[16, 24, 44, 88, 48, 33, 69, 51, 50, 90],
[49, 62, 57, 9, 76, 15, 46, 39, 53, 29],
[66, 18, 85, 48, 27, 76, 74, 8, 95, 64],
[28, 6, 41, 46, 94, 75, 41, 4, 100, 32],
[72, 57, 47, 69, 90, 10, 42, 46, 35, 65],
[64, 9, 66, 75, 63, 35, 22, 86, 75, 34],
[39, 87, 12, 58, 73, 26, 15, 32, 99, 23],
[82, 33, 6, 22, 13, 100, 99, 18, 52, 32],
[80, 30, 21, 1, 47, 56, 70, 65, 98, 27]]
1.4.3. Merging dictionaries
We already saw some examples of merging dictionaries. Here’s another example.
[53]:
sc.parallelize([(randint(1, 10), 1) for _ in range(10000)])\
.reduceByKey(lambda a, b: a + b)\
.map(lambda tup: {tup[0]: tup[1]})\
.reduce(lambda a, b: {**a, **b})
[53]:
{4: 964,
2: 974,
10: 989,
8: 1009,
6: 991,
1: 993,
5: 1021,
9: 1023,
7: 1018,
3: 1018}
1.5. Broadcasting variables
If we have data that needs to be shared across the worker nodes, we can broadcast
that data. Below, we have a dictionary m
that is local to the driver and we want to broadcast (make it available) it to all the worker nodes. We broadcast m
with sc.broadcast()
and assign the reference to b
; note that b
wraps the data m
and we can access the dictionary through value
property of b
(e.g. b.value
). Now our parallel operations can access the dictionary.
[54]:
from random import randint
m = {i: randint(1, 10) for i in range(101)}
b = sc.broadcast(m)
sc.parallelize([randint(1, 100) for _ in range(20000)])\
.map(lambda num: (b.value[num], 1))\
.reduceByKey(lambda a, b: a + b)\
.collect()
[54]:
[(4, 2403),
(6, 1792),
(8, 1562),
(2, 1795),
(10, 1541),
(5, 2654),
(9, 1463),
(3, 1704),
(7, 3432),
(1, 1654)]
1.6. Accumulator
If we want to keep count of things or put metrics on our operations, we need to use an accumulator
. The accumulator
is defined locally (on the driver) but is visible across the worker nodes. Below, we use an accumulator to simply keep track of the number of map operations.