As a memory-based distributed computing engine, Spark’s memory management module plays a very important role in the entire system. Understanding the fundamentals of Spark memory management will help you develop Spark applications and tune performance. This article will introduce two parts in detail, the first part introduces Spark in-heap and off-heap memory planning, mainly including in-heap memory, off-heap memory and memory management interface etc.; The second part focuses on the allocation of Spark memory space, mainly including static memory management and unified memory management mechanisms.
This article aims to sort out the context of Spark memory management, throw bricks and lead to an in-depth discussion of this topic. The principles described in this article are based on Spark version 2.1, and reading this article requires readers to have a certain foundation of Spark and Java, and understand RDD, Shuffle, JVM and other related concepts.
When executing Spark applications, the Spark
cluster will start two JVM processes, Driver and Executor, the former is the main control process, responsible for creating Spark contexts, submitting Spark jobs (Jobs), and converting jobs into computing tasks (Tasks), coordinating task scheduling among each Executor process, and the latter is responsible for executing specific computing tasks on worker nodes. The result is returned to the Driver and provides storage functions for RDDs that need to be persisted. Since the memory management of the driver is relatively simple, this article mainly analyzes the memory management of Executor, and the Spark memory in the following refers specifically to the memory of Executor.
Figure 1 Spark’s Driver and Worker
In-heap and Out-of-heap Memory Planning
As a JVM process, Executor’s memory management is built on top of JVM’s memory management, and Spark allocates the JVM’s on-heap space in more detail to make the most of memory. At the same time, Spark introduces off-heap memory, which allows it to directly open up space in the system memory of worker nodes, further optimizing memory usage.
Figure 2 Out-of-heap and in-heap memory
The
size of the memory in the heap, as configured by the –executor-memory or spark.executor.memory parameter when the Spark application starts. Concurrent tasks running within the Executor share the JVM heap memory, and the memory occupied by these tasks when caching RDD and broadcast data is planned as storage memory, and the memory occupied by these tasks when executing Shuffle is planned as execution memory, and the rest is not specially planned, those object instances inside Spark, or object instances in user-defined Spark applications , both occupy the remaining space. Depending on the management mode, these three parts occupy different amounts of space (described below).
Spark’s management of memory in the heap is a logical “planned” management, because the application and release of the memory occupied by object instances are done by the JVM, Spark can only record this memory after the application and before the release, let’s look at the specific process:
-
Spark new an object instance
-
JVM allocates space from memory in the heap, creates the object, and returns the object reference
-
Spark saves a reference to the object and records the memory occupied by the object
in the
-
-
and waits for the JVM’s garbage collection mechanism to free the heap memory occupied by the object
- Spark records the memory freed by the object, removes the reference to the object
We know that JVM objects can be stored in a serialized manner, and the process of serialization is to convert the object into a binary byte stream, which can essentially be understood as converting the chain storage of non-contiguous space into contiguous space or block storage, and the reverse process of serialization is required when accessing – deserialization, converting the byte stream into an object, and the serialization method can save storage space, but increase the computational overhead of storage and reading.
For serialized objects in
Spark, because it is in the form of a byte stream, the memory size occupied can be directly calculated, while for non-serialized objects, the memory occupied by it is estimated by periodic sampling approximation, that is, not every new data item will calculate the memory size occupied once, which reduces the time overhead but may have a large error, resulting in the actual memory at a certain time may be far beyond expectations. In addition, it is likely that an object instance marked as free by Spark is not actually reclaimed by the JVM, resulting in less actual available memory than the available memory recorded by Spark. Therefore, Spark cannot accurately record the actual available heap memory, so it cannot completely avoid OOM (Out of Memory) exceptions.
Although it cannot accurately control the application and release of memory in the heap, Spark can decide whether to cache new RDDs in storage memory and whether to allocate execution memory for new tasks through independent planning and management of storage memory and execution memory, which can improve memory utilization and reduce the occurrence of exceptions to a certain extent.
To further optimize memory usage and improve the efficiency of shuffle sorting, Spark introduces off-heap memory, which allows it to directly open up space in the system memory of worker nodes to store serialized binary data. Using the JDK Unsafe API (starting with Spark 2.0, managing off-heap storage memory is no longer based on Tachyon, but is based on the JDK Unsafe API implementation like off-heap execution memory), Spark can directly operate off-heap memory, reducing unnecessary memory overhead, as well as frequent GC scanning and reclamation, improving processing performance. Off-heap memory can be accurately requested and released, and the space occupied by serialized data can be accurately calculated, so it is less difficult to manage and less error than intra-heap memory.
Off-heap memory is not enabled by default and can be enabled by configuring the spark.memory.offHeap.enabled parameter, which sets the amount of space outside the heap by the spark.memory.offHeap.size parameter. Except that there is no other space, out-of-heap memory is divided in the same way as intra-heap memory, and all running concurrent tasks share storage memory and execution memory.
The memory management
interface
Spark provides a unified interface for storing memory and performing memory management, MemoryManager, and tasks within the same executor call methods of this interface to request or release memory:
Request storage memory |
We see that when calling these methods, you need to specify the memory mode (MemoryMode), and this parameter determines whether the operation is done inside or outside the heap.
In the specific implementation of MemoryManager, Spark defaulted to Unified Memory Manager mode after 1.6, and the Static Memory Manager mode used before 1.6 was still retained, which can be enabled by configuring the spark.memory.useLegacyMode parameter. The difference between the two methods lies in the way space is allocated, and the two methods are described below.
Memory space allocation
static memory management
– within the heap
Under Spark’s original static memory management mechanism, the size of the three parts of storage, execution memory, and other memory is fixed for the duration of the Spark application, but can be configured by the user before the application starts, and the allocation of memory in the heap is shown in Figure 3
< img src="https://mmbiz.qpic.cn/mmbiz_png/MtezESMLd6Fve39BaRk1sGSRJwKVibxicradRdEgJw4d7Ydtl30ZhjiaMJ9frzYt32POuZJv0LrAzkibUicGDgGEWGQ/640?wx_fmt=jpeg" >Figure 3 Static memory management diagram – As
you can see in the heap, the size of the available memory in the heap needs to be calculated as follows:
> Available storage memory = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction |
where systemMaxMemory depends on the size of the memory in the current JVM heap, and the last available execution memory or storage memory is multiplied by the respective memoryFraction parameter and safetyFraction parameter on this basis. The significance of the two safetyFraction parameters in the above calculation formula is to logically reserve a safe area of 1-safetyFraction to reduce the risk of OOM due to the actual memory exceeding the current preset range (there will be an error in the memory sampling estimation of non-serialized objects). It is worth noting that this reserved insurance area is only a logical plan, and Spark is not treated differently when using it, and it is left to the JVM to manage like “other memory”.
——
The allocation of space outside the heap is relatively simple, and the size of storage memory and execution memory is also fixed, as shown in Figure 4. The amount of space occupied by the available execution memory and storage memory is directly determined by the parameter spark.memory.storageFraction, and since the space occupied by the memory outside the heap can be accurately calculated, there is no need to set the safe area.
Figure 4 Static memory management diagram –
the static memory management mechanism outside the heap is
relatively simple to implement, but if the user is not familiar with Spark’s storage mechanism, or does not make corresponding configurations according to the specific data scale and computing tasks, it is easy to cause a “half seawater, half flame” situation, that is, one side of the storage memory and execution memory has a large amount of space, while the other party is occupied early. Having to retire or move old content to store new content. Due to the advent of new memory management mechanisms, this approach is currently rarely used by developers, and Spark has retained its implementation for compatibility with older versions of applications.
The unified memory management
mechanism introduced after Spark 1.6 differs from static memory management in that storage memory and execution memory share the same space and can dynamically occupy each other’s free areas, as shown in Figure 5 and Figure 6
< img src="https://mmbiz.qpic.cn/mmbiz_png/MtezESMLd6Fve39BaRk1sGSRJwKVibxicrcaof2zJG6vebs8NU91upev7M8qFiaTC6oHH9XFzCO595lqS5HDhWKkw/640?wx_fmt=jpeg" >Figure 5 Diagram of unified memory management – inside the heap
>Figure 6 Diagram of unified memory management –
One of the
most important optimizations outside the heap is the dynamic occupancy mechanism, which has the following rules:
-
setting the basic memory and execution memory area (spark.storage.storageFraction parameter), which determines the range of space owned by each party.
-
When there is insufficient space on both sides, it is stored on the hard disk; If your own space is insufficient and the other party is free, you can borrow the other party’s space; (Not enough storage space means not enough to fit a complete block).
-
you can let the other party transfer the occupied part to the hard disk, and then “return” the borrowed space.
-
is impossible for the other party to “return”, because there are many factors that need to be considered in the Shuffle process, and it is more complicated to implement.
After the space of the execution memory is occupied by the other party,
After the storage memory space is occupied by the other party, it
Figure 7 Illustration of dynamic occupancy mechanism
With a unified memory management mechanism, Spark improves the utilization of memory resources inside and outside the heap to a certain extent, reducing the difficulty for developers to maintain Spark memory, but it does not mean that developers can rest easy. For example, if the storage memory space is too large or the cached data is too large, it will lead to frequent full garbage collection, which will reduce the performance of task execution, because the cached RDD data is usually long-term resident in memory. Therefore, in order to fully utilize the performance of Spark, developers need to further understand the management methods and implementation principles of storage memory and execution memory.
The following part mainly introduces two parts: storage memory management, including the persistence mechanism of RDD, the process of RDD caching, elimination and disk dropping; Perform memory management, including memory allocation between multiple tasks and Shuffle’s memory consumption.
Storage memory management
RDD persistence mechanism
As the most fundamental data abstraction of Spark, an elastic distributed dataset (RDD) is a collection of read-only partitions that can only be created on datasets in stable physical storage or transformed on other existing RDDs to generate a new RDD. The dependency between the converted RDD and the original RDD constitutes the lineage. With pedigree, Spark guarantees that every RDD can be restored. But all RDD conversions are lazy, i.e. Spark creates a task to read the RDD and then actually trigger the execution of the conversion only when an action that returns the result to the driver occurs.
When Task reads a partition at startup, it will first determine whether the partition has been persisted, and if not, you need to check the checkpoint or recalculate it according to the lineage. Therefore, if multiple actions are to be performed on an RDD, you can use the persist or cache method in the first operation to persist or cache the RDD in memory or disk, thereby improving the calculation speed during subsequent actions. In fact, the cache method uses the default MEMORY_ONLY storage level to persist RDDs to memory, so caching is a special kind of persistence. The design of in-heap and off-heap storage memory can make unified planning and management of the memory used when caching RDDs (other application scenarios for storing memory, such as caching broadcast data, are beyond the scope of this article).
The persistence of RDDs is handled by Spark’s Storage module, which realizes the decoupling of RDDs from physical storage. The Storage module manages the data generated by Spark during the computation process, encapsulating functions that access data locally or remotely, either in memory or on disk. In the specific implementation, the storage module of the driver side and the executor side constitute a master-slave architecture, that is, the BlockManager on the driver side is the master, and the BlockManager on the executor side is the slave. The Storage module logically uses Block as the basic storage unit, and each partition of the RDD uniquely corresponds to a Block after processing (the format of BlockId is rdd_RDD-ID_PARTITION-ID). The master is responsible for the management and maintenance of the metadata information of the block of the entire Spark application, while the slave needs to report the status of the block update to the master, and receive commands from the master, such as adding or deleting an RDD.
Figure 8 Storage module When
persisting an RDD, Spark specifies seven different storage levels, including MEMORY_ONLY and MEMORY_AND_DISK, and the storage level is a combination of the following five variables:
|
the data structure, it can be seen that the storage level defines the storage mode of the RDD’s Partition (which is also the Block) from three dimensions:
-
storage location: disk/in-heap memory/off-heap memory. For example MEMORY_AND_DISK it is stored on both disk and heap memory to achieve redundant backup. OFF_HEAP is only stored in off-heap memory, and currently cannot be stored in other locations at the same time when selecting off-heap memory.
-
Storage form: After the block is cached in the storage memory, whether it is in a non-serialized form. For example MEMORY_ONLY it is stored in non-serialized mode, OFF_HEAP it is stored in serialized mode.
-
Number of replicas: If it is greater than 1, remote redundancy is required to back up to other nodes. If DISK_ONLY_2 need to remotely back up 1 copy.
Before RDD is cached into storage memory, data in the partition is generally accessed as an Iterator data structure, which is a method of traversing data collections in the Scala language. Through Iterator, you can obtain each serialized or unserialized data item in the partition, and these Record object instances logically occupy the space of the other part of the memory in the JVM heap, and the space of different records in the same partition is not contiguous.
After the RDD is cached into memory memory, the
partition is converted to a block, and the record occupies a contiguous piece of space in the memory inside or outside the heap. The process of converting a partition from a discontiguous storage space to a contiguous storage space, which Spark calls Unroll. Block has two storage formats, serialized and non-serialized, depending on the storage level of the RDD. A non-serialized block is defined as a deserializedMemoryEntry data structure that stores all Java object instances in an array, while a serialized block is defined as a SerializedMemoryEntry data structure that stores binary data with a byte buffer. Each Executor’s Storage module uses a chained Map structure (LinkedHashMap) to manage instances of all Block objects stored in memory both inside and outside the heap, indirectly recording memory requests and releases for additions and deletions to this LinkedHashMap.
Because there is
no guarantee that the storage space can hold all the data in Iterator at one time, the current computing task must request enough Unlock space from MemoryManager to temporarily occupy a place when Unroll, and Unlock fails if the space is insufficient, and can continue when there is enough space. For serialized partitions, the required Unroll space can be directly accumulated and applied for once. Non-serialized partitions must be applied in turn in the process of traversing the Record, that is, each time a Record is read, the sample estimates the Roll space required and applies for it, and when the space is insufficient, it can be interrupted to free up the occupied Unlock space. If the final Roll is successful, the Roll space occupied by the current partition is converted to the storage space of the normal cache RDD, as shown in Figure 2 below.
Figure 9 Spark Roll Schematic
Diagram As can be seen in Figure 8 and Figure 9, in static memory management, Spark specially divides a piece of Unlock space in the storage memory, and its size is fixed, and there is no special distinction for Unlock space when unified memory management, and when the storage space is insufficient, it will be processed according to the dynamic occupation mechanism.
Since all computing tasks of the same executor share limited storage memory space, when a new block needs to be cached but the remaining space is insufficient and cannot be dynamically occupied, the old block in the LinkedHashMap should be eliminated (Eviction). If the storage level of the eliminated block also contains the requirement to store on the disk, it must be dropped, otherwise the block is directly deleted.
The elimination rule of storage
memory is:
- the old block that is
-
eliminated must be the same as the MemoryMode of the new block, that is, it belongs to the same off-heap or in-heap memory
-
The
-
RDD to which the old block belongs cannot be in the read state, to avoid causing consistency problems
-
traversing the blocks in the LinkedHashMap, and eliminating them in the order of least used (LRU), until the space required for the new block is met. LRU is a feature of LinkedHashMap.
old and new blocks cannot belong to the same RDD, to avoid cyclically eliminating the
The process of placing a disk is relatively simple, if its storage level meets the condition that the _useDisk is true, and then judge whether it is a non-serialized form according to its _deserialized, and if so, serialize it. Finally, store the data to disk and update its information in the Storage module.
Executing Memory ManagementIntertasking
Memory Allocation
Tasks running within an executor also share execution memory, and Spark uses a HashMap structure to save a mapping of tasks to memory consumption. The execution memory that each task can occupy ranges from 1/2N ~ 1/N, where N is the number of running tasks in the current Executor. Each task is started requesting a minimum of 1/2N of the execution memory from the MemoryManager, and if it cannot be met, the task is blocked until another task has released enough execution memory.
Shuffle’s memory occupies execution memory is
mainly used to store the memory occupied by tasks when executing Shuffle, Shuffle is the process of repartitioning RDD data according to certain rules, let’s look at Shuffle’s Write and Read two-stage use of execution memory:
-
If you choose the normal sorting method on the map side, ExternalSorter will be used for external row, which mainly occupies the execution space in the heap when storing data in memory.
-
If you select the sorting method of Tungsten on the map side, ShuffleExternal Sorter is used to directly sort the data stored in serialized form, and the execution space outside the heap or within the heap can be occupied when storing data in memory, depending on whether the user has enabled off-heap memory and whether the off-heap execution memory is sufficient.
-
When aggregating data on the reduce side, the data is handed over to Aggregator for processing, occupying the execution space in the heap when storing data in memory.
-
If you need to sort the final result, you want to hand over the data to the ExternalSorter again, occupying execution space in the heap.
In ExternalSorter and Aggregator, Spark will use a hash table called AppendOnlyMap to store data in memory in the heap, but all data cannot be saved to the hash table during the Shuffle process, and the memory occupied by this hash table will be sampled and estimated periodically, when it is large to a certain extent. When it is no longer possible to request new execution memory from MemoryManager, Spark stores all of its contents in a disk file, a process called spill, and the files spilled to disk are eventually merged.
Tungsten, used in the Shuffle Write phase, is Databricks’ plan to optimize memory and CPU usage for Spark, addressing some of the JVM’s performance limitations and drawbacks. Spark will automatically choose whether to use Tungsten sorting based on the Shuffle situation. Tungsten’s paged memory management mechanism is based on MemoryManager, which means that Tungsten abstracts the use of execution memory step by step, so that there is no need to care whether the data is stored in the heap or outside the heap during the shuffle process. Each memory page is defined by a MemoryBlock, and the two variables Object obj and long offset are used to uniformly identify the address of a memory page in system memory. The MemoryBlock in the heap is the memory allocated in the form of a long-type array, whose obj value is the object reference of the array, and offset is the initial offset address of the long-type array in the JVM, and the two can be used together to locate the absolute address of the array in the heap; The MemoryBlock outside the heap is a directly applied memory block, whose obj is null, and offset is the 64-bit absolute address of this memory block in system memory. Spark uses MemoryBlock to cleverly abstract and encapsulate memory pages inside and outside the heap, and uses page tables to manage the memory pages requested by each Task.
All memory under Tungsten page management is represented by a 64-bit logical address, consisting of page numbers and in-page offsets:
13 bits, uniquely identifies a memory page, Spark must apply for a free page number before applying for a memory page. |
With a unified addressing method, Spark can use a pointer of a 64-bit logical address to locate memory inside or outside the heap, and the entire Shuffle Write sorting process only needs to sort the pointers, and there is no need to deserialize, the whole process is very efficient. It has brought obvious improvements to memory access efficiency and CPU usage efficiency.
Summary