1. Resilient Distributed Datasets (RDD)

Think of a RDD as a distributed dataset. From a Pythonic point of view, imagine a list of integers. This list of integers is one dataset and sits on one computer.

data = [18, 19, 21, 17]

Now, imagine we could somehow split this dataset into two parts and place them on different computers.

data_part_1 = [18, 19] # goes to computer 1
data_part_2 = [21, 17] # goes to computer 2

At its most basic level, an RDD is conceptually a collection of elements that is spread around different computers.

1.1. Acquiring a RDD

How do we create a RDD or where does an RDD come from? RDDs may be created programmatically or from reading files.

1.1.1. Creating a RDD

The easiest way to programmatically create an RDD is to use the parallelize() method from the spark context sc. Note that we pass in a list of numbers; the list of numbers is generated using a list comprehension.

[1]:
num_rdd = sc.parallelize([i for i in range(10)])
type(num_rdd)
[1]:
pyspark.rdd.RDD

1.1.2. Create a pair RDD

Just think of a pair RDD as a distributed dataset whose records are key-value pairs. From a Pythonic point of view, think about a list of tuples. This list of tuples is one dataset and sits on one computer. For each tuple in this list, * the first element is a name and plays the role of the key, and * the second element is an age and plays the role of the value.

data = [('john', 18), ('jack', 19), ('jill', 21), ('jenn', 17)]

Now, imagine we could somehow split this dataset into two parts and place them on different computers.

data_part_1 = [('john', 18), ('jack', 19)] # goes to computer 1
data_part_2 = [('jill', 21), ('jenn', 17)] # goes to computer 2

At its most basic level, a pair RDD is conceptually a collection of 2-tuples that is spread around different computers. Below, we create a pair RDD where the key is a number and the value is the key multiplied by itself.

[2]:
pair_rdd = sc.parallelize([(i, i*i) for i in range(10)])
type(num_rdd)
[2]:
pyspark.rdd.RDD

1.1.3. Read a RDD from HDFS

If we store a CSV file in HDFS (Hadoop Distributed File System), we can read the contents into a RDD via sc.textFile().

[3]:
data_rdd = sc.textFile('hdfs://localhost/data.csv')
type(data_rdd)
[3]:
pyspark.rdd.RDD

1.2. Transformations

After we acquire a RDD, we can do two broad categories of operations.

  • Transformation: an operation to change the data

  • Action: an operation to collect the data

Transformation operations are lazily evaluated. Just because you have applied a transformation to a RDD does not mean anything will happen. Only when you execute an action against the RDD does computation actually start. Let’s look at some types of transformations that we may perform against RDDs.

1.2.1. Map

The map() function transforms each element into something else. Below, we transform the original number into a new ones by

  • multiplying that number by itself,

  • adding one to that number,

  • subtracting one from that number, and

  • dividing that number by ten.

[4]:
num_rdd.map(lambda x: x * x).collect()
[4]:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[5]:
num_rdd.map(lambda x: x + 1).collect()
[5]:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[6]:
num_rdd.map(lambda x: x - 1).collect()
[6]:
[-1, 0, 1, 2, 3, 4, 5, 6, 7, 8]
[7]:
num_rdd.map(lambda x: x / 10).collect()
[7]:
[0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]

1.2.2. Filter

The filter() method removes elements from a RDD. The filter method must supply a function that returns True (to keep) or False (to remove) each element. Below, we filter even and odd elements out of the data.

[8]:
num_rdd.filter(lambda x: x % 2 == 0).collect()
[8]:
[0, 2, 4, 6, 8]
[9]:
num_rdd.filter(lambda x: x % 2 != 0).collect()
[9]:
[1, 3, 5, 7, 9]

1.2.3. Flat map

The flatMap() function flattens lists of lists into a list of elements. Let’s say we have the following list of list.

[10]:
data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

What we want to do is to flatten this list data so that the resulting list is as follows.

[1, 2, 3, 4, 5, 6, 7, 8, 9]

How do we flatten a list of lists in Python? In Python we can use the chain() method from the itertools module.

[11]:
from itertools import chain

list(chain(*data))
[11]:
[1, 2, 3, 4, 5, 6, 7, 8, 9]

When using PySpark, the flatMap() function does the flattening for us.

[12]:
data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
sc.parallelize(data).flatMap(lambda x: x).collect()
[12]:
[1, 2, 3, 4, 5, 6, 7, 8, 9]

1.2.4. Sample

If we want to take samples from a RDD, we can use the sample() method. The arguments to sample() are as follows.

  • withReplacement will indicate if we want to sample with replacement (records can be selected multiple times)

  • fraction specifies the percentage of the data we want to bring back

  • seed will be the seed used to control for randomization during sampling

[13]:
num_rdd.sample(withReplacement=False, fraction=0.2, seed=37).collect()
[13]:
[0, 3, 7]

1.2.5. Union

If we have two RDDs, we can bring them together through union().

[14]:
num_rdd.union(num_rdd).collect()
[14]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

1.2.6. Intersection

Between two RDDs, if we want only the elements they share in common, we can apply the intersection() function.

[15]:
a = sc.parallelize([1, 2, 3])
b = sc.parallelize([3, 4, 5])

a.intersection(b).collect()
[15]:
[3]

1.2.7. Distinct

The distinct() function will bring back only unique elements.

[16]:
a = sc.parallelize([1, 2, 2, 3, 4])
a.distinct().collect()
[16]:
[2, 4, 3, 1]

1.2.8. Group by key

If we have a pair RDD, we can group data by the key using groupByKey(). After we apply groupByKey() a new pair RDD is created where * the key is the key as before and * the value is an iterable.

Below, we convert the iterable to a list using the list() function. The groupByKey() is an expensive operation as it causes data shuffling. In the Spark framework, we work extra hard to keep data from moving (as there is a lot of data and we do not desire to congest the network with such movement of huge data); the only thing we desire to move is the compute code. Try to avoid groupByKey() when you can.

[17]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])

for key, it in a.groupByKey().collect():
    print(key, list(it))
2 [4, 3]
1 [1, 2]

1.2.9. Reduce by key

If we wanted to collapse all the values associated with a key in a pair RDD, we need to use the reduceByKey() function. The reduceByKey() function is much more efficient than groupByKey(). We should work extra hard to modify logic that works for groupByKey() to work for and use reduceByKey().

Below, we simply sum over all the values associated with a key.

[18]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])
a.reduceByKey(lambda a, b: a + b).collect()
[18]:
[(2, 7), (1, 3)]

Here is a sort of an anti-pattern using groupByKey() to add the elements associated with each key. We get the same result as with reduceByKey(), but with potentially extra overhead (data shuffling).

[19]:
add_elements = lambda tup: (tup[0], sum(list(tup[1])))

sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])\
    .groupByKey()\
    .map(add_elements)\
    .collect()
[19]:
[(2, 7), (1, 3)]

Here’s an interesting dataset. It’s a list of 2-tuples, where the first element is the key or unique identifier of a person, and the second element is a piece of information (stored in a map) about the person. How do we use reduceByKey() to merge all the information according to the unique identifier? If you look below, you will notice that we merge the dictionaries using the dictionary unpacking operator **.

[20]:
data = [
    (1, {'name': 'john'}),
    (2, {'name': 'jack'}),
    (1, {'age': 23}),
    (2, {'age': 24}),
]

sc.parallelize(data).reduceByKey(lambda a, b: {**a, **b}).collect()
[20]:
[(2, {'name': 'jack', 'age': 24}), (1, {'age': 23, 'name': 'john'})]

1.2.10. Aggregate by key

In a pair RDD, we can specify how to aggregate values by keys within and between partitions. There are three arguments required.

  • an initial value

  • a combining function to aggregate within a partition

  • a merging function to aggregate between partitions

Below, are some examples of how to aggregate by key.

[21]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])
a.aggregateByKey('value', lambda s, d: f'{s} {d}', lambda s1, s2: f'{s1}, {s2}').collect()
[21]:
[(2, 'value 4 3'), (1, 'value 1 2')]
[22]:
a = sc.parallelize([(1, 1), (1, 2), (2, 4), (2, 3)])
a.aggregateByKey(0, lambda s, d: s + d, lambda s1, s2: s1 + s2).collect()
[22]:
[(2, 7), (1, 3)]

1.2.11. Sort by key

We can also sort records by key in a pair RDD using sortByKey().

[23]:
a = sc.parallelize([(1, 1), (3, 2), (5, 4), (4, 3)])
a.sortByKey().collect()
[23]:
[(1, 1), (3, 2), (4, 3), (5, 4)]

1.2.12. Join

If we have two pair RDDs, we can perform a join based on the keys using join().

[24]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (3, 4)])
a.join(b).collect()
[24]:
[(1, (1, 2)), (2, (2, 3)), (3, (3, 4))]

Note that join() is like a SQL inner join; only records with keys in both RDDs will be returned.

[25]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3), (5, 5)])
b = sc.parallelize([(1, 2), (2, 3), (3, 4), (6, 6)])
a.join(b).collect()
[25]:
[(1, (1, 2)), (2, (2, 3)), (3, (3, 4))]

1.2.13. Left outer join

The leftOuterJoin() will join two pair RDDs like a SQL left-outer join. All records on the left will be returned even if there is not a corresponding matching record on the right.

[26]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (4, 5)])
a.leftOuterJoin(b).collect()
[26]:
[(1, (1, 2)), (2, (2, 3)), (3, (3, None))]

1.2.14. Right outer join

The rightOuterJoin() will join two pair RDDs like a SQL right-outer join. All records on the right will be returned even if there is not a corresponding matching record on the left.

[27]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (4, 5)])
a.rightOuterJoin(b).collect()
[27]:
[(4, (None, 5)), (1, (1, 2)), (2, (2, 3))]

1.2.15. Full outer join

The fullOuterJoin() will join two pair RDDs like a SQL full-outer join. All records on the left and right will be returned.

[28]:
a = sc.parallelize([(1, 1), (2, 2), (3, 3)])
b = sc.parallelize([(1, 2), (2, 3), (4, 5)])
a.fullOuterJoin(b).collect()
[28]:
[(4, (None, 5)), (1, (1, 2)), (2, (2, 3)), (3, (3, None))]

1.2.16. Cogroup

The cogroup() function will bring the values from two pair RDDs together.

[29]:
a = sc.parallelize([(1, 1), (1, 2), (2, 3), (2, 4), (3, 5), (3, 6)])
b = sc.parallelize([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (3, 'e'), (3, 'f')])

for key, (it1, it2) in a.cogroup(b).collect():
    print(key, list(it1), list(it2))
1 [1, 2] ['a', 'b']
2 [3, 4] ['d', 'c']
3 [5, 6] ['e', 'f']

1.2.17. Cartesian

In Python, if we had two list as follows, and we wanted the cartesian product of those two lists, we use product from the itertools module.

[30]:
from itertools import product

a = [1, 2, 3, 4]
b = ['a', 'b', 'c', 'd']

list(product(*[a, b]))
[30]:
[(1, 'a'),
 (1, 'b'),
 (1, 'c'),
 (1, 'd'),
 (2, 'a'),
 (2, 'b'),
 (2, 'c'),
 (2, 'd'),
 (3, 'a'),
 (3, 'b'),
 (3, 'c'),
 (3, 'd'),
 (4, 'a'),
 (4, 'b'),
 (4, 'c'),
 (4, 'd')]

We can achieve the same using the cartesian() function on two RDDs.

[31]:
a = sc.parallelize([1, 2, 3, 4])
b = sc.parallelize(['a', 'b', 'c', 'd'])
a.cartesian(b).collect()
[31]:
[(1, 'a'),
 (1, 'b'),
 (2, 'a'),
 (2, 'b'),
 (1, 'c'),
 (1, 'd'),
 (2, 'c'),
 (2, 'd'),
 (3, 'a'),
 (3, 'b'),
 (4, 'a'),
 (4, 'b'),
 (3, 'c'),
 (3, 'd'),
 (4, 'c'),
 (4, 'd')]

1.2.18. Repartition

We can force our distributed dataset (the RDD) into a specified number of partitions using repartition().

[32]:
a = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])
print(a.getNumPartitions())

a = a.repartition(4)
print(a.getNumPartitions())
2
4

If we want to see the elements in each partition, call the glom() function on the RDD. You can see that the data is skewed across the 4 requested partitions (some partitions are empty and others are not). In general, this result is not the case when calling repartition().

[33]:
a.glom().collect()
[33]:
[[], [5, 6, 7, 8], [], [1, 2, 3, 4]]

Rather, in general, calling repartiton() will generate the specified number of partitions with roughly equal sized number of elements. We can troubleshoot the result of repartition() as follows.

[34]:
import pandas as pd
import numpy as np

def get_stats(n_partitions, elements_per_partition):
    min_elements = min(elements_per_partition)
    max_elements = max(elements_per_partition)
    avg_elements = np.mean(elements_per_partition)
    std_elements = np.std(elements_per_partition)

    return {
        'n_partitions': n_partitions,
        'min_elements': min_elements,
        'max_elements': max_elements,
        'avg_elements': avg_elements,
        'std_elements': std_elements,
        'elements_per_partition': elements_per_partition
    }

a = sc.parallelize([i for i in range(10000)])

pd.DataFrame([get_stats(n_partitions, a.repartition(n_partitions).glom().map(len).collect()) for n_partitions in range(2, 10)])
[34]:
n_partitions min_elements max_elements avg_elements std_elements elements_per_partition
0 2 5000 5000 5000.000000 0.000000 [5000, 5000]
1 3 3330 3340 3333.333333 4.714045 [3330, 3330, 3340]
2 4 2500 2500 2500.000000 0.000000 [2500, 2500, 2500, 2500]
3 5 2000 2000 2000.000000 0.000000 [2000, 2000, 2000, 2000, 2000]
4 6 1660 1670 1666.666667 4.714045 [1670, 1670, 1670, 1660, 1660, 1670]
5 7 1420 1440 1428.571429 6.388766 [1420, 1420, 1430, 1430, 1430, 1430, 1440]
6 8 1250 1250 1250.000000 0.000000 [1250, 1250, 1250, 1250, 1250, 1250, 1250, 1250]
7 9 1100 1120 1111.111111 5.665577 [1120, 1110, 1110, 1110, 1110, 1110, 1100, 111...

1.2.19. Coalesce

We can also force our RDD into a specified number of partitions using coalesce().

[35]:
a = sc.parallelize(['hello', 'world'])
print(a.getNumPartitions())

a = a.coalesce(2)
print(a.getNumPartitions())
2
2

So, repartition() and coalesce() seem to do the same thing: forcing the data into a specified number of partitions. What’s the difference?

  • repartition() incurs a cost of a shuffling and creating new partitions, however, the resulting partitions are roughly equal in size.

  • coalesce() minimizes shuffling of data and reuses existing partitions, however, the result partitions will most likely not be roughly equal in size.

Which should I use? It depends on your goals and/or preferences. If you want computation to be evenly distributed, go for repartition(), otherwise, save time by not shuffling data and use coalesce().

1.2.20. Pipe

The pipe() function enables you to specify an external script or program to transform the data. The script or program must be able to receive the data as input and should return an output. The script must be accessible on all the compute nodes; a common mistake is that the script only exists on the driver node and your piping fails.

In the code below, by default, the RDD has 12 partitions, that is why we see 12 outputs of One-Off Coder. Obviously, or not, 10 of the partitions have no data and only 2 of them do (one for hello and one for world). The script is just a simple echo and looks like the following.

#!/bin/sh
echo 'One-Off Coder'
while read LINE; do
    echo ${LINE}
done
[36]:
a = sc.parallelize(['hello', 'world'])
a.pipe('/root/ipynb/echo.sh').collect()
[36]:
['One-Off Coder', 'hello', 'One-Off Coder', 'world']

If we force the number of partitions to 2, then we get more sensible output.

[37]:
a = sc.parallelize(['hello', 'world']).repartition(2)
a.pipe('/root/ipynb/echo.sh').collect()
[37]:
['One-Off Coder', 'hello', 'One-Off Coder', 'world']

If we force the number of partitions to 1, then all the data will be fed to one instance of the script (that’s why we see One-Off Coder once only).

[38]:
a = sc.parallelize(['hello', 'world']).repartition(1)
a.pipe('/root/ipynb/echo.sh').collect()
[38]:
['One-Off Coder', 'world', 'hello']

1.2.21. Repartition and sort within partitions

For a pair RDD, we can control how many partitions we want and which records go into which partitions with repartitionAndSortWithinPartitions(). What we get for free is sorting within each partition. The arguments for repartitionAndSortWithinPartitions() are as follows.

  • numPartitions specifies the number of desired partitions

  • partitionFunc specifies how to assign records to partitions

  • ascending specifies if we want to sort ascendingly

  • keyfunc specifies how to retrieve the key

[39]:
sc.parallelize([(1, 5), (2, 15), (1, 4), (2, 14), (1, 3), (2, 13)])\
    .map(lambda tup: (tup, tup[1]))\
    .repartitionAndSortWithinPartitions(
        numPartitions=2,
        partitionFunc=lambda tup: tup[0] % 2)\
    .map(lambda tup: tup[0])\
    .collect()
[39]:
[(2, 13), (2, 14), (2, 15), (1, 3), (1, 4), (1, 5)]

1.3. Actions

Remember, transformations on RDDs create other RDDs and are lazily evaluated (no computational cost is incurred). On the other hand, when an action is applied to a RDD, a non-RDD is the result and the data is typically returned to the driver node (or the user from the worker nodes).

1.3.1. Reduce

The reduce() function collapses all the elements into one.

[40]:
a = sc.parallelize([1, 2, 3])
a.reduce(lambda a, b: a + b)
[40]:
6

Reducing data does not have to be math operations like adding. Below, we merge the dictionaries into one.

[41]:
a = sc.parallelize([{'fname': 'john'}, {'lname': 'doe'}, {'age': 32}])
a.reduce(lambda a, b: {**a, **b})
[41]:
{'fname': 'john', 'lname': 'doe', 'age': 32}

We can also reduce data by selecting on the smallest value.

[42]:
from random import randint

a = sc.parallelize([randint(10, 1000) for _ in range(100)])
a.reduce(lambda a, b: min(a, b))
[42]:
12

1.3.2. Collect

The collect() function is an action that we have been using all along. This function simply brings back the distributed data into one list on the driver. Be careful, though, as if the data is huge, this operation may fail.

[43]:
a = sc.parallelize([1, 2, 3])
a.collect()
[43]:
[1, 2, 3]

1.3.3. Count

The count() function counts the number of elements in a RDD.

[44]:
a = sc.parallelize([1, 2, 3])
a.count()
[44]:
3

Below, we generate 1,000 random numbers in the range \([1, 10]\). We then perform a map() operation creating a list of \(x\) length for each \(x\), followed by a flatMap() and then count().

[45]:
from random import randint

a = sc.parallelize([randint(1, 10) for _ in range(1000)])
a.map(lambda x: [x for _ in range(x)]).flatMap(lambda x: x).count()
[45]:
5521

1.3.4. First

The function first() always returns the first record back from a RDD.

[46]:
a = sc.parallelize([1, 2, 3])
a.first()
[46]:
1

1.3.5. Take

We can bring back the first \(n\) records using take().

[47]:
a = sc.parallelize([1, 2, 3])
a.take(2)
[47]:
[1, 2]

1.3.6. Take sample

We can bring back random records using takeSample().

[48]:
a = sc.parallelize([i for i in range(100)])
a.takeSample(withReplacement=False, num=10, seed=37)
[48]:
[62, 46, 36, 88, 25, 22, 51, 16, 0, 52]

1.3.7. Take ordered

We can bring back the first \(n\) records in order using takeOrdered().

[49]:
from random import randint

a = sc.parallelize([randint(1, 10000) for _ in range(1000)])
a.takeOrdered(10)
[49]:
[3, 6, 15, 46, 46, 62, 76, 79, 81, 84]

1.3.8. Count by key

Counting the number of records associated with a key is accomplished through countByKey().

[50]:
a = sc.parallelize([(randint(1, 10), 1) for _ in range(10000)])
a.countByKey()
[50]:
defaultdict(int,
            {5: 986,
             3: 1008,
             1: 995,
             9: 1021,
             2: 983,
             4: 982,
             6: 1035,
             8: 996,
             10: 999,
             7: 995})

1.4. Chaining transformations and actions

The power of transformations and actions emerges from chaining them together.

1.4.1. Map, filter, reduce

The three basic functions introduced when we start to adopt functional programming are map(), filter() and reduce(). Below, we map each number \(x\) to \(x \times x\), filter for only even numbers, and then add the results.

[51]:
num_rdd = sc.parallelize([i for i in range(10)])

num_rdd\
    .map(lambda x: x * x)\
    .filter(lambda x: x % 2 == 0)\
    .reduce(lambda a, b: a + b)
[51]:
120

1.4.2. Filter, map, take

Here’s an example of parsing out a CSV file. Note that we have to filter out the row starting with x since that indicates the header (for this CSV file). We then split (or tokenize) the line specifying the delimiter as a comma ,. We finally convert all the tokens, which are strings, to integers. We take the first 10 records (rows) to see if we parsed the CSV file correctly.

[52]:
data_rdd = sc.textFile('hdfs://localhost/data.csv')

data_rdd\
    .filter(lambda s: False if s.startswith('x') else True)\
    .map(lambda s: s.split(','))\
    .map(lambda arr: [int(s) for s in arr])\
    .take(10)
[52]:
[[41, 82, 16, 51, 11, 85, 46, 50, 1, 55],
 [16, 24, 44, 88, 48, 33, 69, 51, 50, 90],
 [49, 62, 57, 9, 76, 15, 46, 39, 53, 29],
 [66, 18, 85, 48, 27, 76, 74, 8, 95, 64],
 [28, 6, 41, 46, 94, 75, 41, 4, 100, 32],
 [72, 57, 47, 69, 90, 10, 42, 46, 35, 65],
 [64, 9, 66, 75, 63, 35, 22, 86, 75, 34],
 [39, 87, 12, 58, 73, 26, 15, 32, 99, 23],
 [82, 33, 6, 22, 13, 100, 99, 18, 52, 32],
 [80, 30, 21, 1, 47, 56, 70, 65, 98, 27]]

1.4.3. Merging dictionaries

We already saw some examples of merging dictionaries. Here’s another example.

[53]:
sc.parallelize([(randint(1, 10), 1) for _ in range(10000)])\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda tup: {tup[0]: tup[1]})\
    .reduce(lambda a, b: {**a, **b})
[53]:
{4: 964,
 2: 974,
 10: 989,
 8: 1009,
 6: 991,
 1: 993,
 5: 1021,
 9: 1023,
 7: 1018,
 3: 1018}

1.5. Broadcasting variables

If we have data that needs to be shared across the worker nodes, we can broadcast that data. Below, we have a dictionary m that is local to the driver and we want to broadcast (make it available) it to all the worker nodes. We broadcast m with sc.broadcast() and assign the reference to b; note that b wraps the data m and we can access the dictionary through value property of b (e.g. b.value). Now our parallel operations can access the dictionary.

[54]:
from random import randint

m = {i: randint(1, 10) for i in range(101)}
b = sc.broadcast(m)

sc.parallelize([randint(1, 100) for _ in range(20000)])\
    .map(lambda num: (b.value[num], 1))\
    .reduceByKey(lambda a, b: a + b)\
    .collect()
[54]:
[(4, 2403),
 (6, 1792),
 (8, 1562),
 (2, 1795),
 (10, 1541),
 (5, 2654),
 (9, 1463),
 (3, 1704),
 (7, 3432),
 (1, 1654)]

1.6. Accumulator

If we want to keep count of things or put metrics on our operations, we need to use an accumulator. The accumulator is defined locally (on the driver) but is visible across the worker nodes. Below, we use an accumulator to simply keep track of the number of map operations.