In the field of big data computing, Spark has become one of the more and more popular computing platforms. Spark’s functions cover various types of computing operations in the field of big data, such as offline batch processing, SQL class processing, streaming/real-time computing, machine learning, and graph computing, and have a wide range of applications and prospects. Most students (including myself) first started trying Spark for the simple reason that they wanted to make big data computing jobs faster and more performant.

However, developing high-performance big data computing jobs with Spark is not so simple. If Spark jobs are not properly tuned, Spark jobs may execute slowly, which does not reflect the advantages of Spark as a fast big data computing engine. Therefore, if you want to use Spark well, you must optimize its performance reasonably.

Spark performance tuning is actually made up of many parts, and it is not necessary to adjust a few parameters to immediately improve job performance. We need to comprehensively analyze Spark jobs according to different business scenarios and data conditions, and then adjust and optimize multiple aspects to obtain the best performance.

This topic serves as the basis for the Spark performance tuning guide, mainly explaining development tuning and resource tuning.

2 Second, develop and optimize

3 Third, tuning overview

The first step in Spark performance optimization is to pay attention to and apply some basic principles of performance optimization in the process of developing Spark jobs. Development tuning is to let everyone understand the following basic development principles of Spark, including: RDD line design, reasonable use of operators, optimization of special operations, etc. In the development process, you should always pay attention to the above principles, and flexibly apply these principles to your own Spark operations according to specific business and actual application scenarios.

4 Principle 1: Avoid creating duplicate RDDs

Generally speaking, when we develop a Spark job, we first create an initial RDD based on a data source (such as Hive tables or HDFS files); Then perform a certain operator operation on this RDD, and then get the next RDD; And so on, and so on, until we have calculated the result we need in the end. In this process, multiple RDDs will be strung together through different operator operations (such as map, reduce, etc.), and this “RDD string” is the RDD lineage, that is, the “blood relationship chain of the RDD”.

During development, we should note that for the same data, only one RDD should be created, and multiple RDDs should not be created to represent the same data.

Some Spark beginners may forget that they have already created an RDD for a certain piece of data before, resulting in multiple RDDs for the same data. This means that our Spark job will perform multiple repeated calculations to create multiple RDDs representing the same data, which increases the performance overhead of the job.

A simple example

 requires a map operation on an HDFS file named "hello.txt" and another reduce operation. That is, two operator operations need to be performed on one piece of data. Wrong practice: When multiple operators are performed on the same data, multiple RDDs are created. The textFile method is executed twice, two RDDs are created for the same HDFS file, and then an operator operation is executed for each RDD. In this case, Spark needs to load the contents of the hello.txt file twice from HDFS and create two separate RDDs; the performance overhead of loading the HDFS file and creating the RDD a second time is obviously wasted. 

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")

rdd1.map(...)

val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")

rdd2.reduce(...) Correct usage: When performing multiple operator operations on a piece of data, only one RDD is used. This is obviously much better than the previous one, because we only create one RDD for the same piece of data, and then perform multiple operator operations on this RDD. However, it should be noted that the optimization has not ended here, because rdd1 is executed twice, and when the reduce operation is performed for the second time, the data of rdd1 will be recalculated from the source again, so there will still be a performance overhead of repeated calculations. To completely solve this problem, it is necessary to combine "Principle 3: Persist RDDs for multiple uses" to ensure that an RDD is only counted once when it is used multiple times.

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")

rdd1.map(...) rdd1.reduce(...)

5 Principle two: Reuse the same RDD whenever possible

In addition to avoiding the creation of multiple RDDs on an identical piece of data during development, it is also necessary to reuse one RDD as much as possible when performing operator operations on different data. For example, if one RDD has a key-value data format and another is a single value type, the value data of the two RDDs is exactly the same. Then we can only use the RDD of type key-value, because it already contains another data. For cases like this where the data of multiple RDDs overlap or contain, we should try to reuse one RDD, which can reduce the number of RDDs as much as possible, so as to reduce the number of operator executions as much as possible.

A simple example

of

 the wrong approach. There is an RDD  format, which is rdd1. Then, due to business needs, a map operation was performed on rdd1 to create an rdd2, and the data in rdd2 is only the value value in rdd1, that is, rdd2 is a subset of rdd1. JavaPairRDD rdd1 = ... JavaRDD rdd2 = rdd1.map(...) Different operator operations were performed on rdd1 and rdd2 respectively. rdd1.reduceByKey(...) rdd2.map(...) The right thing to do. In the above case, in fact, the difference between rdd1 and rdd2 is 

nothing more than the difference in data format, and the data of rdd2 is completely a subset of rdd1, but two rdds are created and an operator operation is performed on both rdds.

In this case, RDD2 will be executed once more because the map operator is executed on rdd1, which will increase the performance overhead. In fact, in this case, it is completely possible to reuse the same RDD. We can use rdd1 to do both reduceByKey and map operations. In the second map operation, only the tuple._2 of each data, that is, the value value in rdd1, is used. JavaPairRDD rdd1 = ... rdd1.reduceByKey(...) rdd1.map(tuple._2...) Compared with the first method, the second method significantly reduces the computational overhead of one RDD2. But up to this point, the optimization is not over, we still perform two operator operations on rdd1, and rdd1 is actually evaluated twice. Therefore, it is also necessary to use it in conjunction with "Principle 3: Persist RDDs for multiple uses" to ensure that an RDD is only counted once when it is used multiple times.

6 Principle 3: Persist

RDDs that are used multiple times When you have

performed operator operations on an RDD multiple times in Spark code, congratulations, you have implemented the first step of optimization of Spark jobs, that is, reusing RDDs as much as possible. At this point, it is time to carry out the second step of optimization on this basis, that is, to ensure that when multiple operator operations are performed on an RDD, the RDD itself is only calculated once.

The default principle in Spark for multiple operators on an RDD is as follows: every time you perform an operator operation on an RDD, you will calculate it again from the source, calculate the RDD, and then execute your operator operation on this RDD. The performance of this method is very poor.

Therefore, our recommendation for this situation is: persist the RDD that is used multiple times. At this point, Spark will save the data in the RDD to memory or disk according to your persistence policy. In the future, every time the operator operation is performed on this RDD, the persistent RDD data will be extracted directly from memory or disk, and then the operator will be executed, instead of recalculating the RDD from the source and then performing the operator operation.

Code example for persisting an RDD for multiple uses

 If you want to persist an RDD, just call cache() and persist() on the RDD. The right thing to do. The cache() method means that all data in the RDD is attempted to be persisted to memory in a non-serialized way. At this time, when two more operators are performed on rdd1, only when the map operator is executed for the first time, this rdd1 will be calculated from the source once. When the reduce operator is executed for the second time, the data will be extracted directly from memory for calculation, and an RDD will not be calculated repeatedly. 

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()

rdd1.map(...) rdd1.reduce(...) The persist() method means that the persistence level is manually selected and persisted in the specified way. For example, StorageLevel.MEMORY_AND_DISK_SER means that when memory is sufficient, it is first persisted to memory, and when memory is insufficient, it is persisted to a disk file. Moreover, the _SER suffix indicates that RDD data is saved by serialization, at which point each partition in the RDD is serialized into a large byte array and then persisted to memory or disk. The serialization method can reduce the amount of memory/disk occupied by persistent data, thereby avoiding excessive memory occupied by persistent data, resulting in frequent GC.

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)

rdd1.map(...) rdd1.reduce(...)

For the persist() method, we can choose different persistence levels according to different business scenarios.

Spark’s persistence

level

Persistence level meaning
MEMORY_ONLY explained Use the unserialized Java object format to keep data in memory. If the memory is not enough for all the data, the data may not be persisted. Then the next time you perform an operator operation on this RDD, the data that has not been persisted needs to be recalculated from the source. This is the default persistence policy, which is actually used when using the cache() method.
MEMORY_AND_DISK use the unserialized Java object format, first try to keep data in memory. If the memory is not enough to store all the data, the data will be written to the disk file, and the next time the operator is executed on this RDD, the data persisted in the disk file will be read out and used.
MEMORY_ONLY_SER basic meaning is the same MEMORY_ONLY. The only difference is that the data in the RDD is serialized, and each partition of the RDD is serialized into a byte array. This method is more memory-saving, which can avoid frequent GC due to excessive memory consumption of persistent data.
MEMORY_AND_DISK_SER basic meaning is the same MEMORY_AND_DISK. The only difference is that the data in the RDD is serialized, and each partition of the RDD is serialized into a byte array. This method is more memory-saving, which can avoid frequent GC due to excessive memory consumption of persistent data.
DISK_ONLY uses the unserialized Java object format to write all data to a disk file.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc For any of the above persistence strategies, if you add the suffix _2, it means that each persistent data is copied and saved to other nodes. This replica-based persistence mechanism is mainly used for fault tolerance. If a node is down, and persistent data in the node’s memory or disk is lost, then the data can also be used for subsequent RDD calculations on other nodes. If there is no copy, the data can only be recalculated from the source.

How to choose the most appropriate persistence strategy

    > By default, the highest performance is of course MEMORY_ONLY, but only if your memory is large enough to hold all the data of the entire RDD. Because no serialization and deserialization operations occur, this part of the performance overhead is avoided; The subsequent operator operations on this RDD are all operations based on pure memory data, which do not need to read data from disk files, and the performance is also very high; And there’s no need to make a copy of the data and transfer it remotely to another node. However, it must be noted here that in the actual production environment, I am afraid that the scenarios that can directly use this strategy are still limited, if there is a lot of data in the RDD (such as billions), directly using this persistence level will cause the OOM memory overflow exception of the JVM.

  • If a memory overflow occurs when using a MEMORY_ONLY level, it is recommended to try a MEMORY_ONLY_SER level. This level serializes the RDD data and then stores it in memory, where each partition is just a byte array, greatly reducing the number of objects and reducing the memory footprint. The performance overhead of this level is more than MEMORY_ONLY, mainly the overhead of serialization and deserialization. However, subsequent operators can be operated based on pure memory, so the overall performance is relatively high. In addition, the problem that may occur is the same as above, if the amount of data in the RDD is too large, it may still cause an exception of OOM memory overflow.

  • If none of the memory-only levels are available, we recommend that you use MEMORY_AND_DISK_SER policy instead of MEMORY_AND_DISK policy. Because since it has reached this step, it means that the amount of data in RDDs is very large, and the memory cannot be completely put down. There is less serialized data, which can save memory and disk space overhead. At the same time, the policy will give priority to trying to cache data in memory as much as possible, and the memory cache will not be written to disk until it is not lower.

  • Levels with DISK_ONLY and suffix _2 are generally not recommended: because reading and writing data based entirely on disk files can lead to drastic performance degradation, and sometimes it is better to recalculate all RDDs once. At the level of the suffix _2, all data must be copied and sent to other nodes, data replication and network transfers incur a large performance overhead, and are not recommended unless high availability of the job is required.

7 Principle Four: Try to avoid using shuffle-like operators

If possible, try to avoid using shuffle-like operators. Because when Spark jobs are running, the most performance-consuming place is the shuffle process. The shuffle process, in simple terms, is to pull the same key distributed across multiple nodes in the cluster to the same node for aggregation or joining. For example, operators such as reduceByKey and join will trigger shuffle operations.

In the shuffle process, the same key on each node is first written to the local disk file, and then other nodes need to pull the same key in the disk file on each node through network transfer. Moreover, when the same keys are pulled to the same node for aggregation operations, there may be too many keys processed on one node, resulting in insufficient memory storage, and then overflow to disk files. Therefore, in the shuffle process, a large number of IO operations for reading and writing disk files and network data transmission operations may occur. Disk IO and network data transfer are also the main reasons for shuffle’s poor performance.

Therefore, in our development process, we can avoid using reduceByKey, join, distinct, repartition, etc. as much as possible to shuffle operators, and try to use non-shuffle operators of map class. In this way, Spark jobs with no shuffle operations or with fewer shuffle operations can greatly reduce the performance overhead.

Broadcast and map join code examples

 A traditional join operation leads to a shuffle operation. Because in two RDDs, the same key needs to be pulled to a node through the network and joined by a task. val rdd3 = rdd1.join(rdd2)// Broadcast+map join operation, does not result in shuffle operation. Use Broadcast to use a small amount of data RDD as a broadcast variable. val rdd2Data = rdd2.collect()val rdd2DataBroadcast = sc.broadcast(rdd2Data)// In the rdd1.map operator, all data of rdd2 can be obtained from rdd2DataBroadcast. Then traverse, if it is found that the key of a piece of data in rdd2 is the same as the key of the current data of rdd1, then it is determined that a join can be performed. At this time, you can stitch together the current data of rdd1 and the data that can be connected in rdd2 (String or Tuple) according to your needs. val rdd3 = rdd1.map(rdd2DataBroadcast...) Note that the above operations are recommended to only be used when the amount of data in rdd2 is relatively small (such as a few hundred M, or one or two gigabytes). Because each Executor's memory will reside in a copy of the full data of rdd2. 

8 Principle 5: Use the shuffle operation of map-side pre-aggregation If you must use the shuffle operation

because of business needs, and cannot be replaced by an operator of the map class, then try to use operators that can be map-side pre-aggregation.

The so-called map-side pre-aggregation is to perform an aggregation operation on the same key locally on each node, similar to the local combiner in MapReduce. After map-side pre-aggregation, each node will only have the same key, because multiple identical keys are aggregated. When other nodes pull the same key on all nodes, the amount of data that needs to be pulled is greatly reduced, thereby reducing disk IO and network transmission overhead. In general, it is recommended to use the reduceByKey or aggregateByKey operators instead of the groupByKey operators when possible. Because both the reduceByKey and aggregateByKey operators use user-defined functions to pre-aggregate the same key locally for each node. The groupByKey operator will not be pre-aggregated, and the full amount of data will be distributed and transmitted between the nodes of the cluster, and the performance is relatively poor.

For example, the following two figures are typical examples, based on reduceByKey and groupByKey for word counting, respectively. The first of these diagrams is the schematic of groupByKey, and you can see that all data is transferred between cluster nodes without any local aggregation; The second diagram is the schematic of reduceByKey, which can be seen that the same key data local to each node is pre-aggregated before being transmitted to other nodes for global aggregation.

9 Principle 6: Use high-performance operators

In addition to shuffle-related operators have optimization principles, other operators also have corresponding optimization principles.

Using reduceByKey/aggregateByKey instead of groupByKey

is described in “Principle Five: Using Map-Side Pre-Aggregated Shuffle Operations.”

Using mapPartitions

instead of the operator of the ordinary map mapPartitions

class, a

function call will process all the data of a partition, rather than processing one at a time, and the performance will be relatively higher. But sometimes, using mapPartitions creates OOM (memory overflow) issues. Because a single function call will dispose of all the data in a partition, if the memory is not enough, too many objects cannot be recycled during garbage collection, and OOM exceptions are likely to occur. So use this type of operation with caution!

Using foreachPartitions instead of foreach principle is similar to “using

mapPartitions instead

of

map”, which is to process all the data of a partition in one function call, rather than processing one piece of data in one function call. In practice, it is found that the operator of the foreachPartitions class is still very helpful for performance improvement. For example, in the foreach function, write all the data in the RDD to MySQL, then if it is an ordinary foreach operator, it will write data one by one, and each function call may create a database connection, at this time, it is bound to frequently create and destroy the database connection, and the performance is very low; However, if you use the foreachPartitions operator to process the data of a partition at a time, then for each partition, you only need to create a database connection, and then perform batch insert operations, the performance is relatively high. In practice, it has been found that for writing MySQL for about 10,000 pieces of data, the performance can be improved by more than 30%.

After using the filter to perform

the coalesce operation,

it is recommended to use the coalesce operator to manually reduce the number of partitions in the RDD and compress the data in the RDD into fewer partitions after executing the filter operator on an RDD to filter out more data in the RDD. Because after filter, each partition of the RDD will have a lot of data filtered out, at this time, if the subsequent calculation is carried out as usual, in fact, the amount of data in the partition processed by each task is not very large, there is a little waste of resources, and the more tasks processed at this time, the slower the speed may be. Therefore, coalesce is used to reduce the number of partitions, and after compressing the data in the RDD to fewer partitions, all partitions can be processed with fewer tasks. In some scenarios, it can help to improve performance.

Use repartitionAndSortWithinPartitions instead of repartition and sort class operations

repartitionAndSortWithinPartitions is an operator recommended by Spark’s official website, and the official recommendation is that if you need to sort after repartitioning, it is recommended to use the repartitionAndSortWithinPartitions operator directly. Because this operator can perform shuffle operations on repartitioning while sorting. The shuffle and sort operations are performed at the same time, and the performance may be higher than shuffle and then sort.

10 Principle 7: Broadcast large variables Sometimes in the development process, you will encounter scenarios where you need to use external variables in operator functions (especially large variables

, such as large collections of more than 100M), then Spark’s Broadcast function should be used to improve performance.

When an external variable is used in an operator function, by default, Spark copies multiple copies of the variable and transmits them to the task over the network, at which point each task has a copy of the variable. If the variable itself is large (such as 100M, or even 1G), the performance overhead of a large number of variable copies transmitted in the network, as well as the frequent GC caused by occupying too much memory in the executor of each node, will greatly affect performance.

Therefore, for the above situation, if the external variable used is relatively large, it is recommended to use Spark’s broadcast function to broadcast the variable. The broadcast variable will ensure that only one copy of the variable resides in the memory of each executor, and the task in the executor shares the copy of the variable in the executor when it is executed. In this way, the number of variable copies can be greatly reduced, thereby reducing the performance overhead of network transmission, and reducing the overhead of Executor memory and GC frequency.

Code example for broadcasting large variables

 The following code uses an external variable in the operator function. No special operations are done at this time, and each task will have a copy of list1. val list1 = ... rdd1.map(list1...) The following code encapsulates list1 as a broadcast variable of type Broadcast. In the operator function, when using a broadcast variable, it will first determine whether there is a copy of the variable in the memory of the Executor where the current task is located. Use it directly if available; If not, remotely pull a copy from the driver or other Executor node and put it in the local Executor memory. For each Executor memory, only one copy of the broadcast variable resides. val list1 = ... val list1Broadcast = sc.broadcast(list1)rdd1.map(list1Broadcast...)

11 Principle Eight: Using Kryo to Optimize Serialization Performance

In Spark, there are three main places that involve serialization:

    when an external variable is

  • used in the operator function, the variable is serialized and transmitted over the network (see ” Principle 7: Broadcast Big Variables”).
  • When a custom type is used as a

  • generic type of an RDD (such as JavaRDD, where Student is a custom type), all custom type objects are serialized. Therefore, in this case, it is also required that the custom class must implement the Serializable interface.
  • When using serializable persistence strategies such as MEMORY_ONLY_SER, Spark serializes each partition in the RDD into a large byte array.

For all three places where serialization occurs, we can optimize the performance of serialization and deserialization by using the Kryo serialization class library. Spark uses Java’s serialization mechanism, ObjectOutputStream/ObjectInputStream API for serialization and deserialization by default. However, Spark also supports the use of the Kryo serialization library, which has much higher performance than the Java serialization library. Officially, the Kryo serialization mechanism is about 10 times higher performance than the Java serialization mechanism. The reason why Spark does not use Kryo as a serialization class library by default is because Kryo requires that it is best to register all custom types that need to be serialized, so this method is cumbersome for developers.

The following is a code example using Kryo, we only need to set the serialization class, and then register the custom type to be serialized (such as the external variable type used in the operator function, the custom type as an RDD generic type, etc.):

 Create a SparkConf object. val conf = new SparkConf().setMaster(...). setAppName(...) Set the serializer to KryoSerializer. 

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

registers the custom type to be serialized. conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]

))

)12 Principle Nine: Optimizing Data Structures

In Java, there are three types that consume memory:

    object,

  • each Java object has additional information such as object headers, references, etc., so it takes up memory space.
  • Strings, each with a character array inside and extra information such as length.
  • Collection types, such as

  • HashMap, LinkedList, etc., because collection types usually use some internal classes to encapsulate collection elements, such as Map.Entry.

Therefore, Spark

officially recommends that in the Spark coding implementation, especially for the code in the operator function, try not to use the above three data structures, try to use strings instead of objects, use primitive types (such as Int, Long) instead of strings, and use arrays instead of collection types, so as to reduce memory occupation as much as possible, thereby reducing GC frequency and improving performance.

However, in my coding practice, I found that it is not easy to achieve this principle. Because we also have to consider the maintainability of the code, if there is no object abstraction at all in a code, all string concatenation, then for subsequent code maintenance and modification, it is undoubtedly a huge disaster. Similarly, if all operations are implemented based on arrays, without using collection types such as HashMap and LinkedList, it will also be a great challenge for our coding difficulty and code maintainability. Therefore, the author recommends that data structures that occupy less memory be used when possible and appropriate, but only if the maintainability of the code is guaranteed.

13 Principle 10: Data Localization Level

PROCESS_LOCAL: Process localization, code and data in the same process, that is, in the same executor; The task of calculating the data is executed by executor, and the data is in executor’s BlockManager; The best

performance NODE_LOCAL: node localization, code and data in the same node; For example, the data is on the node as an HDFS block block, and the task runs in an executor on the node. Or, data and task in different executors on one node; Data needs to be transferred between processes NO_PREF: for a task, it is the same wherever it is obtained, there is no good or bad RACK_LOCAL: rack localization, data and task on two nodes in one rack; Data needs to be transferred between nodes over the ANY: data and tasks can be anywhere in the cluster and not in a rack, with the worst performance

spark.locality.wait, which is 3s by default

Spark on the Driver, before allocating tasks on each stage of Application, will calculate which shard data to be calculated for each task, a certain partition of RDD; Spark’s task allocation algorithm, first, will want each task to be assigned exactly to the node where the data it wants to calculate, so that there is no need to transfer data between networks;

But maybe Task doesn’t have the opportunity to allocate to the node where its data is located, because maybe that node’s computing resources and computing power are full; So, at this time, usually, Spark will wait for a period of time, by default it is 3s clock (not absolute, there are many situations, for different localization levels, will wait), in the end, really can’t wait, will choose a relatively poor localization level, for example, assign the task to the node where the data it wants to calculate, a closer node, and then calculate.

But for the second case, usually, it must be that data transmission occurs, the task will obtain data through the BlockManager of the node where it is located, and the BlockManager

finds that it has no data locally, and will obtain data from the BlockManager of the node where the data is located through a getRemote() method through TransferService (network data transmission component), It is transmitted back to the node where the task is located over the network.

For us, of course, we don’t want to be similar to the second case. The best, of course, is task and data on a node, fetching data directly from the local executor’s BlockManager, pure memory, or with a little disk IO; If you want to transfer data over the network, then really, the performance will definitely decrease, a large number of network transfers, and disk IO, are performance killers.

When should I adjust this parameter?

Observe the log and the

running log of the Spark job, it is recommended that you use the client mode first when testing, and you can directly see the more complete log locally. THE LOG WILL SHOW, STARTING TASK, PROCESS LOCAL, NODE LOCAL, OBSERVE THE DATA LOCALIZATION LEVEL OF MOST TASKS.

If most of them are PROCESS_LOCAL, then there is no need to adjust, if it is found, many levels are NODE_LOCAL, ANY, then it is best to adjust the waiting time for data localization after adjustment, it should be adjusted repeatedly, after each adjustment, then run and observe the log
See if the localization level of most tasks has improved; Let’s see

, the running time of the entire Spark job has been shortened, but

pay attention to don’t put the cart before the horse, the localization level has been improved, but because of a large number of waiting times, the running time of the Spark job has increased, so don’t adjust it.

spark.locality.wait, the default is 3s; you can change it to 6s, 10s

By default, the waiting time of the following 3 is the same as the one above, all 3s.

spark.locality.wait.process//Recommendation 60sspark.locality.wait.node//Recommendation 30sspark.locality.wait.rack//Recommendation 20s