8. Tips

8.1. Fine-tuning

Fine-tuning your Spark jobs is one of the most difficult things you will do as the attempt to do so will require intimate knowledge of your cluster and data properties. For example, your jobs may be slow because the hardware supporting your cluster is limited (e.g. memory, CPU, GPU, hard disk, network, etc.), or your data is not partitioned correctly (e.g. your partition scheme has generated many small files). You may overcome some of the side-effects of your cluster limitations or data properties through configuring your Spark job, but not all resolutions may be possible with just configuration alone. For example, it may be that you have too many partitions of very small sizes, and no amount of configurations can fix the computational overhead incurred from the many files problem; you will have to partition your files into more reasonable sizes (e.g. at least 128 MB). Fine-tuning is an art more than a science and error-prone.

8.1.1. Spark configuration, hardware

Make sure you understand these concepts involved in a Spark application. There is a sort of a duality with the terms and concepts in Spark because there is hardware and software at play. Some of the confusion may arise from calling the same thing different things because we are looking at the cluster and its components from a hardware or software point of view.

  • application: A Spark application uses Spark to do computational work. At a high level, a Spark application involves a driver and many executors to distribute and coordinate computational work. A Spark application is software.

  • driver: The driver program usually refers to the point of entry in your Spark application where work is defined and submitted to the executors in the cluster. Usually, the driver refers to the physical node in your cluster where you kick off your distributed computations. The driver may be synonymous with being the master of the cluster. There is only 1 driver in a Spark application, or, there is only 1 master node in a Spark cluster. You can think of driver and master as the same thing but from different angles; driver referring to the software and master referring to the hardware (the single controlling node).

  • worker: Worker nodes are the nodes in the cluster where computational work is actually assigned to and done at. There are usually many worker nodes in a Spark cluster, but there must always be at least 1 worker node. Worker nodes are also called slave nodes or just slaves.

  • executor: An executor is a process on a worker node that executes computational tasks. A single executor on a worker node may execute more than one task, if configured to do so. Furthermore, a worker node can have more than one executor.

  • job: A job is computational work as a result of invoking a Spark action. A job is composed of many stages. A Spark application or driver program will typically kick of many jobs.

  • stage: A phase of a job. A job is decompsed into stages depending on a few factors, including, whether or not the computational work may be done in parallel or serially. The decomposition of a job into multiple stages is influenced by the shuffling and computational boundaries. A stage is composed of many tasks.

  • task: A task is the smallest unit of work. A task runs inside an executor and usually maps to a data partition. The transformation code you write in Spark will end up being executed as a task after you invoke an action.

One way to optimize or fine-tune your Spark application is to look at the CPU and memory resources of the nodes in your cluster and configure your Spark application appropriately. Not all configurations of a Spark application can be changed at runtime; only those configurations that are acknowledged during spark-submit may be configured at runtime. Configurations that may be changed at runtime are called runtime configurations, and those that can only be set when the cluster is deployed are called deployment configurations. The most influental configurations to fine-tuning your Spark applications are the following.

  • spark.executor.instances: Specifies the number of executors per worker node. According to the documentation, this property is a deployment configuration (not a runtime configuration).

  • spark.executor.cores: Specifies the number of (CPU) cores per executor.

  • spark.executor.memory: Specifies the amount of memory per executor.

To simplify the explanation, we assume that all nodes are identical (e.g. the master/slave nodes, or the driver/worker nodes, if you prefer these latter terms). Let’s just look at the worker nodes and also assume there are 10 of them. The specification of the cluster is as follows.

  • 10 nodes

  • 16 cores/node

  • 64GB RAM/node

This cluster specification means that there are 160 cores and 640GB of RAM in the cluster. In one extreme, we can set the following configurations.

  • spark.executor.instances: 16

  • spark.executor.cores: 1

  • spark.executor.memory: 4GB

This configuration means that each worker node will have 16 executors, where each executor will be using 1 core and 4GB of memory. There’s a few problems with this configuration.

  • Since an executor has only 1 core, only 1 task can run per executor.

  • Broadcast variables and accumulators will be replicated 16 times (once per executor) on the same worker node.

  • No overhead memory is reserved.

In another extreme, we can set the following configurations.

  • spark.executor.instances: 10

  • spark.executor.cores: 16

  • spark.executor.memory: 64GB

This configuration means that each worker will have 1 executor, where each executor will be using 16 cores and 64GB of memory. There are still problems with this set of configurations.

  • Excessive garbage collection as a result of too much memory for each executor.

  • No overhead memory is reserved.

Here’s a happy medium between the previous two sets of configurations.

  • spark.executor.instances: 3

  • spark.executor.cores: 5

  • spark.executor.memory: 19GB

This configuration means that each worker will have 3 executors, where each executor will be using 5 cores and 21GB of memory. Remember, there are 16 cores per worker, but in this set of configurations, we are using only (3 x 5) 15 cores; we save 1 core for overhead daemons. Also, there is 64GB of memory, but we are only using (3 x 19) 57GB of memory; we save 7GB for heap memory overhead.

Let’s work out an example for a cluster using 10 m5.2xlarge instances. Here’s the cluster specification.

  • 10 nodes

  • 8 cores/node

  • 32GB RAM/node

An ideal configuration might be as follows.

  • spark.executor.instances: 2

  • spark.executor.cores: 3

  • spark.executor.memory: 14GB

For this set of configurations, we are only using 6 of 8 cores per worker and 28GB of 32GB of RAM.

8.1.2. Data partition

For Spark, you should aim for your partition sizes to be 128MB (Hadoop’s default partition size is 64MB). In practice, target partition sizes to be 128MB to 1GB in size.