8. Tips
8.1. Fine-tuning
Fine-tuning your Spark jobs is one of the most difficult things you will do as the attempt to do so will require intimate knowledge of your cluster and data properties. For example, your jobs may be slow because the hardware supporting your cluster is limited (e.g. memory, CPU, GPU, hard disk, network, etc.), or your data is not partitioned correctly (e.g. your partition scheme has generated many small files). You may overcome some of the side-effects of your cluster limitations or data
properties through configuring your Spark job, but not all resolutions may be possible with just configuration alone. For example, it may be that you have too many partitions of very small sizes, and no amount of configurations can fix the computational overhead incurred from the many files
problem; you will have to partition your files into more reasonable sizes (e.g. at least 128 MB). Fine-tuning is an art more than a science and error-prone.
8.1.1. Spark configuration, hardware
Make sure you understand these concepts involved in a Spark application. There is a sort of a duality
with the terms and concepts in Spark because there is hardware and software at play. Some of the confusion may arise from calling the same thing different things because we are looking at the cluster and its components from a hardware or software point of view.
application
: A Spark application uses Spark to do computational work. At a high level, a Spark application involves adriver
and manyexecutors
to distribute and coordinate computational work. A Spark application is software.driver
: The driver program usually refers to the point of entry in your Spark application where work is defined and submitted to theexecutors
in the cluster. Usually, thedriver
refers to the physical node in your cluster where you kick off your distributed computations. Thedriver
may be synonymous with being themaster
of the cluster. There is only 1 driver in a Spark application, or, there is only 1 master node in a Spark cluster. You can think ofdriver
andmaster
as the same thing but from different angles;driver
referring to the software andmaster
referring to the hardware (the single controlling node).worker
: Worker nodes are the nodes in the cluster where computational work is actually assigned to and done at. There are usually many worker nodes in a Spark cluster, but there must always be at least 1 worker node. Worker nodes are also calledslave
nodes or justslaves
.executor
: An executor is a process on aworker
node that executes computational tasks. A single executor on a worker node may execute more than one task, if configured to do so. Furthermore, a worker node can have more than one executor.job
: A job is computational work as a result of invoking a Sparkaction
. A job is composed of manystages
. A Spark application or driver program will typically kick of many jobs.stage
: A phase of a job. A job is decompsed into stages depending on a few factors, including, whether or not the computational work may be done in parallel or serially. The decomposition of a job into multiple stages is influenced by the shuffling and computational boundaries. A stage is composed of manytasks
.task
: A task is the smallest unit of work. A task runs inside an executor and usually maps to a data partition. The transformation code you write in Spark will end up being executed as a task after you invoke an action.
One way to optimize or fine-tune your Spark application is to look at the CPU and memory resources of the nodes in your cluster and configure your Spark application appropriately. Not all configurations of a Spark application can be changed at runtime; only those configurations that are acknowledged during spark-submit
may be configured at runtime. Configurations that may be changed at runtime are called runtime
configurations, and those that can only be set when the cluster is deployed
are called deployment
configurations. The most influental configurations to fine-tuning your Spark applications are the following.
spark.executor.instances
: Specifies the number of executors per worker node. According to the documentation, this property is a deployment configuration (not a runtime configuration).spark.executor.cores
: Specifies the number of (CPU) cores per executor.spark.executor.memory
: Specifies the amount of memory per executor.
To simplify the explanation, we assume that all nodes are identical (e.g. the master/slave nodes, or the driver/worker nodes, if you prefer these latter terms). Let’s just look at the worker nodes and also assume there are 10 of them. The specification of the cluster is as follows.
10 nodes
16 cores/node
64GB RAM/node
This cluster specification means that there are 160 cores and 640GB of RAM in the cluster. In one extreme, we can set the following configurations.
spark.executor.instances
: 16spark.executor.cores
: 1spark.executor.memory
: 4GB
This configuration means that each worker node will have 16 executors, where each executor will be using 1 core and 4GB of memory. There’s a few problems with this configuration.
Since an executor has only 1 core, only 1 task can run per executor.
Broadcast variables and accumulators will be replicated 16 times (once per executor) on the same worker node.
No overhead memory is reserved.
In another extreme, we can set the following configurations.
spark.executor.instances
: 10spark.executor.cores
: 16spark.executor.memory
: 64GB
This configuration means that each worker will have 1 executor, where each executor will be using 16 cores and 64GB of memory. There are still problems with this set of configurations.
Excessive garbage collection as a result of too much memory for each executor.
No overhead memory is reserved.
Here’s a happy medium between the previous two sets of configurations.
spark.executor.instances
: 3spark.executor.cores
: 5spark.executor.memory
: 19GB
This configuration means that each worker will have 3 executors, where each executor will be using 5 cores and 21GB of memory. Remember, there are 16 cores per worker, but in this set of configurations, we are using only (3 x 5) 15 cores; we save 1 core for overhead daemons. Also, there is 64GB of memory, but we are only using (3 x 19) 57GB of memory; we save 7GB for heap memory overhead.
Let’s work out an example for a cluster using 10 m5.2xlarge instances. Here’s the cluster specification.
10 nodes
8 cores/node
32GB RAM/node
An ideal configuration might be as follows.
spark.executor.instances
: 2spark.executor.cores
: 3spark.executor.memory
: 14GB
For this set of configurations, we are only using 6 of 8 cores per worker and 28GB of 32GB of RAM.
8.1.2. Data partition
For Spark, you should aim for your partition sizes to be 128MB (Hadoop’s default partition size is 64MB). In practice, target partition sizes to be 128MB to 1GB in size.