Optimising performance of Spark Streaming applications on AWS EMR
by Neha Kaul, Senior Consultant in our Sydney team
Altis recently delivered a real-time analytics platform using Apache Spark Streaming on AWS EMR with real-time data being streamed from AWS Kinesis Streams.
In this blog post we provide insights into the key optimisation techniques that were applied to improve performance. These techniques include:
- Data partitioning
- Spark and Yarn resource allocation
- Auto Scaling for EMR
- Scaling Kinesis Streams
Data Partitioning in the context of a distributed system refers to dividing the data into multiple parts across the cluster. Minimising network traffic in a cluster is a key factor in achieving high performance and increasing parallelism is one way to do that. Partitioning data with the right number of partitions helps increase parallelism. Spark Streaming by default partitions the RDDs and stores the data across the worker nodes. This partition number by default is calculated by dividing the block interval (200ms by default) by the batch interval (in our case 4 seconds) which gives us 50. Therefore it becomes essential for us to validate the number of partitions that are ideal for our solution. Not having enough partitions may lead to not all the cores being utilised in the cluster, whereas having too many partitions can cause too much overhead in scheduling and maintaining tasks.
For example, take a 40 core cluster with 50 partitions being created. If the time it takes to process each partition is 3 seconds, the 40 cores will take 3 seconds to process the 40 partitions in parallel. The remaining 10 partitions will be queued for 3 seconds before they are processed. As soon as the 40 cores become available again, the 10 partitions will be processed leaving the 30 cores sitting idle. This now tells us that the number of partitions should be at least equal to the number of cores in the cluster to better utilise the cluster and improve the performance. Depending on the time it executes each partition, this number can be increased to 2-3 times the cores until an optimal balance between processing time and concurrency is achieved. In Spark Streaming, this number can be modified by Dstream.repartition() or by setting the spark.default.parallelism property.
Spark executor memory and CPU allocation
Spark is known to be highly resource intensive when it comes to memory and CPU usage, as such resource allocation configuration at both the Spark and Yarn layer can have a critical impact on the overall performance.
Understanding how Yarn manages the running of a Spark application is key in being able to arrive at the ideal resource configuration for your application.
Each Spark application has a driver process and multiple executor processes running across all the nodes in the cluster that execute the tasks. When running on EMR the Spark executors run in yarn containers. A yarn container is a collection of CPU cores and memory. The first container that starts up contains the Application Master process. This process is used to request resources from Yarn, and triggers the NodeManagers in all the nodes to start up the executors.
The executors in all the nodes require a number of cores and memory to execute the tasks. This number will depend on the type of workload and resources available, some jobs may require more memory while others may require more CPU. There are two factors that will have an impact on these numbers; these are: the resources that are available, and the optimal number of cores per executor. HDFS throughput deteriorates if more than 5 cores are assigned per executor; to mitigate this we can start the process of deriving the rest of the numbers with 2 cores per executor. Below is an example showing how we can derive the numbers for each critical resource.
Assuming there are 4 nodes in a cluster with 10 cores per node and 64GB RAM per node:
- Number of executors per node = 5 executors (divide the total of 10 cores by the number of cores per executor which we set at 2)
- Number of executors in total = 20 (5 executors multiplied by the total of 4 nodes)
- Memory per executor = 3GB (64 GB divided by the total 20 executors)
The number of cores can be increased or decreased based on the core usage which will mean we will need to adjust the number of executors and memory per executor accordingly.
Monitoring resource usage
Spark Web UI can be used to monitor the jobs and the resources to ensure the above numbers are optimal for our application.
To identify how much memory our data set requires, a single RDD is created and cached in memory. Once it is cached, in Spark Web UI under the storage tab check the ‘size in memory’ as shown in the image below. The memory is based on a single text file. There is also the additional memory cost of accessing these objects in memory as well as the garbage collection associated with the objects.
If tasks are being queued (in the Spark Web UI Stages tab) as shown below, this could indicate a need to take advantage of the parallelism offered by Spark and increase the number of executors.
For example if 200 tasks are pending, each task requires 1 core and a single executor has 5 cores then 40 executors are required to run all the tasks in parallel.
Enabling the spark.dynamicAllocation property allows Spark to add and remove executors dynamically based on the workload. When using Spark streaming ensure that the executor idle timeout is greater than the batch timeout to ensure the unused executors are removed from the cluster.
If the time to execute the entire job is taking longer than expected, then increase parallelism by increasing the number of cores per executor. However, more than 5 cores can lead to poor performance due to increased HDFS I/O.
The above examples explain the memory and cores required for the executor processes in Spark.
Spark driver memory and CPU allocation
The memory and cores required for the driver process depends on the work that the driver will be performing.
The driver process is responsible for scheduling the tasks and managing the flow of the jobs. By default the driver is allocated 1GB of memory. Typically if the Spark job is not collecting a large dataset and sending it back to the driver (for example executing collect()) then 1GB should be sufficient. If out-of-memory exceptions are observed then try increasing the memory.
Ideally the executors should have the maximum resources in the cluster, so the minimum should be set for the driver process. When it comes to the cores, similarly, if the computation being performed by the driver is not intensive then 1 core for the driver should be sufficient.
Yarn resource allocation
Spark asks Yarn for the memory and cores for executing a job, so we need to ensure that Yarn itself has sufficient resources to provide to Spark. In Yarn, memory in a single executor container is divided into Spark executor memory plus overhead memory (spark.yarn.executor.memoryOverhead). This memory is the off-heap memory which is used for VM overheads and other native overheads.
At a minimum this memory sits at 384MB and can grow to up to 10% of the total executor memory. This needs to be taken into account when setting the executor memory for Spark (exec memory + (exec memory x .01)). Similarly the final memory allocated to the driver process is also determined by taking into account the spark.yarn.driver.memoryOverhead which can grow to up to 10% of the driver memory.
The property that allocates the minimum memory per container in Yarn is yarn.scheduler.minimum-allocation-mb. The executor memory can be larger than this value but should be less than yarn.scheduler.maximum-allocation-mb.
For code optimisation (depending on the memory available) ensure the RDDs that are repeatedly being used throughout the application are cached. This saves the expensive re-computation that occurs each time the code is executed as the RDD can be read from memory. To ensure that the RDDs are not occupying too much memory in cache, in the Spark Web UI check the size of the cache in memory under the Storage tab.
Auto scaling for EMR
The workload in a streaming application can vary, so being able to scale in and scale out the resources to respond to the varying demands becomes critical. EMR allows us to automate the scaling in and scaling out of the nodes in a cluster based on utilisation metrics that are sent to CloudWatch.
Two key metrics that have proven to be useful are YARNMemoryAvailablePercentage and ContainerPendingRatio. For YARNMemoryAvailablePercentage add or remove instances when this percentage is above or below a specified number. For example, a rule can be created when there is less than 20% of YARNMemoryAvailablePercentage for a period of 400 seconds then the instance group should scale out by adding 1 more instance to the cluster. Similarly specify that an instance should be removed if YARNMemoryAvailablePercentage is greater than 70% for a length of 200 seconds.
The other metric ContainerPendingRatio is the ratio that is calculated by dividing the number of yarn containers that are pending to be created by the total containers allocated. In this case, specify that when this ratio is less than 0.70 (70 pending/100 total) then the instance group should scale out by adding 2 instances to the instance group. Similarly scaling in of the instance group could be triggered when the ratio is greater than 0.30 (30 pending/100 total).
This auto scaling policy should also specify the MaxCapacity and MinCapacity of instances, where the scaling out of instances will stop when the instances hit the MaxCapacity number and scaling in of instances will stop when the instances hit the MinCapacity.
Scaling Kinesis streams
Spark Streaming uses KCL (Amazon Kinesis Client Library) to consume data from a Kinesis stream. A single stream is composed of one or more shards. A shard represents a group of data records that can be uniquely identified in a stream. A single shard provides a capacity of 1MB/sec data input and 2MB/sec data output and can support up to 1000 put records per second.
In order for Spark to process data in real-time, the Kinesis stream needs to maintain a high throughput. If the throughput requirements of the solution requires that the data rate of your application is higher than 1MB/sec writes and 2MB/sec reads then additional shards need to be added to the stream.
In order to read the data from multiple shards increase the number of Kinesis receivers on the Spark end. Note: each Kinesis receiver is a long running task and can occupy a single core. As such ensure that the executor has enough cores allocated to be able to run the receiver as well as to process the data.
The data that is pushed into Kinesis by the source system also needs to be well distributed to ensure all shards evenly receive the data and one shard does not receive more data than the others leading to a bottleneck. This can be avoided by creating a random ‘partition’ key. This partition key is included in the put request to Kinesis and refers to the shard to which the data will be sent to.
This blog described some of the techniques that were applied to optimise the performance of Spark streaming applications on EMR.
If you or your team are experiencing performance issues with Spark applications, get in touch with us to see how we can help you.