Sharing guests: Fan Youlun Alibaba Cloud Technical Expert

Editor’s Finish: Li Ming Multi-point dmall

Production platform: DataFunTalk

With the development of big data technology, Spark has become one of the most concerned computing engines in the field of big data today. In the traditional production environment, Spark on YARN has become the mainstream way of task execution, and with the concept of containerization and the popularization of the idea of storage and computational separation, especially the official availability of this model (GA) under Spark 3.1, Spark on K8s has become a trend.

Today’s presentation revolves around the following two points:

The basic concepts and features of Spark on K8s

Spark on K8s Optimization and Best Practices in Alibaba Cloud EMR


The basic concepts and features of Spark on K8s

First, let’s share some background on Spark on K8s.

1. Spark’s cluster deployment mode

Spark now supports 4 deployment modes:

Standalone: Uses Spark’s built-in scheduler, which is generally used in test environments because it does not make full use of the scheduling framework of big data and cannot fully utilize cluster resources.

Hadoop YARN: One of the most common ways, derived from Hadoop, has a good community ecology.

Apache Mesos: Similar to YARN, it is also a resource management framework that is now gradually departing from the historical stage.

Kubernetes: Spark on K8s, Spark 3.1.1 officially provides available support for this deployment model, and more and more users are actively trying to do this.

The advantages of using Spark on K8s are as follows:

Improve resource utilization: Instead of deploying multiple clusters according to usage scenarios, all Spark jobs share cluster resources, which can improve the overall cluster utilization, and can elastically elastic container instances when used on the cloud, truly paying for what you want.

Unified O&M mode: K8s community ecology and tools can be used to maintain clusters in a unified manner and reduce O&M costs caused by cluster switchover.

Containerization: Through container image management, the portability of Spark tasks is improved, the problem of version conflicts caused by different versions of Spark is avoided, and multiple versions of A/B Test are supported.

In particular, according to our tests, the performance gap between Spark on K8s and Spark on YARN is almost negligible under the same cluster resource conditions. Coupled with making full use of the elastic resources of Spark on K8s, Spark jobs can be better accelerated.

In summary, Spark on K8s actually has more advantages than disadvantages compared to the Spark on YARN model.

2. Spark on K8s deployment architecture

In the current environment, there are two ways to submit Spark jobs to K8s:

Use native spark-submit

In this way, K8s clusters do not require the components to be installed in advance. Like the YARN submission method used now, the client side of the submission job needs to install the Spark environment, and configure kubectl, which is a tool to connect to the K8s cluster, and then mark the K8s cluster address and the Spark image address used in the submit command.

The diagram above shows in detail the process of using native spark-submit to submit tasks to K8s. After executing the spark-submit command on the client side, the user starts a process locally, and the process connects to the K8s api server to request a Driver Pod. The Driver Pod starts the Spark Context during the startup process and is responsible for applying for the Executor Pod. After the task is executed, the Driver Pod takes care of cleaning up the Executor Pod. However, the Driver Pod is retained after it is over, and it is used for log or status viewing and needs to be cleaned up manually.


Use Spark-on-K8s-operator

Spark-on-K8s-operator is a component of Google’s open source that requires a resident pod to be deployed in a K8s cluster in advance to provide related services. Unlike the first way, instead of submitting on the command line, using this approach is to submit a yaml file to submit the job using kubectl. Essentially, the implementation of this tool is still the way to use spark-submit, but it is equivalent to the information on the command line being submitted in the form of a file in a different format. However, Spark-on-K8s-operator has made some auxiliary tools on the basis of the first method, including scheduled scheduling, monitoring, and job management.

Process-wise, the user submits a yaml file, and the Spark-on-K8s-operator, which is resident on the K8s cluster, overhears the event and starts a Spark task by parsing the file into executing the spark-submit command.

In addition to the different submission methods, we just mentioned that this tool provides some auxiliary features. Spark-on-K8s-operator intercepts K8s’s API requests through K8s’s Mutating Admission Webhook mechanism, and can handle some custom configurations when starting Driver and Executor Pod resources. Tools, on the other hand, can listen for events from Driver and Executor Pods to track and manage the progress of task execution.


3. Community progress for Spark on K8s

Before Spark 2.3, some people tried to support Spark on K8s by deploying YARN on K8s, but in essence, Spark is still running under the resource control of YARN, so it cannot be called Spark on K8s in the full sense.

Spark 2.3, the community first released support for the native Spark on K8s, all for the first time officially support such a deployment method.

Spark 2.4 has done a small number of feature optimizations, and the real improvement of this feature is in the Spark 3 version, especially Spark 3.1 is officially available (GA). The current Spark on K8s direction is very hot, so if you are interested, it is recommended to upgrade directly to Spark 3.1 to try this deployment method.

4. Key features of Spark on K8s

Optimize Spark Pod configuration properties

K8s pod definitions are usually handled by Yaml’s descriptions, and the early Driver and Executor Pod definitions can only be configured through Spark Conf, which is very flexible, after all, not all configurations can be processed through Spark Conf. Spark 3.0 started with support for using template files. Users can create template files, define the properties of the pod, and then pass in through the configuration of spark, which is more convenient and flexible than a single configuration.

Dynamic Resource Allocation

In the Spark2 version, dynamic resource allocation can only be used by External Shuffle Service (ESS), in this way, all the shuffle data generated by executor at execution is taken over by the ESS service, and the executor is recycled at any time after execution. However, this method is generally managed by YARN’s Node Manager and is difficult to deploy on K8s.

The Spark3 version supports the Shuffle Tracking feature, which is that it can use its own management of executors without ESS to achieve the effect of dynamic resource configuration. But the disadvantage of this method is that the executor cannot be dynamically recycled in the shuffle read stage, and still needs to be retained for the reducer to read the shuffle data, and then needs to wait until the driver-side gc will mark this executor can be released, and the resource release efficiency is low.

node decommissioning

In the K8s environment, it is still common for nodes to shrink and preempt instance recycling, especially in some scenarios, where the task priority of some Sparks is lowered to meet the use of other high-priority tasks. In this scenario, executor directly exiting may have stage recalculation, which prolongs the execution time of Spark. Spark 3.1 provides an “elegant offline” feature, which supports Executor Pod to notify the driver not to assign a new Task before it is “forced” to go offline, and migrate the cached data or shuffle files to other Executor Pods, thereby ensuring the efficiency of the corresponding Spark task and avoiding recalculation.

At present, this function is still experimental in nature, that is, it is not turned on by default.

PersistentVolumeClaim reuse

PersisentVolumnClaim (pvc for short) is a storage claim for K8s, and each Pod can explicitly apply for mounting. Spark3.1 supports dynamic creation of pvc, which means that there is no need to declare the application in advance, and the resource can be mounted with the dynamic application of execution. However, at this time, the life cycle of PVC is accompanied by Executor, and if the above-mentioned preemptive forced shutdown occurs, the same problem of data loss recalculation stored on pvc will also occur. So in Spark 3.2, pvc reuse is supported, and its life cycle accompanies Driver, avoiding re-application and calculation, and ensuring overall efficiency.


Spark on K8s Optimization and Best Practices in Alibaba Cloud EMR

Next, I will share with you the optimization and best practices of Alibaba Cloud EMR for Spark on K8s.

1. Introduction to Spark on ACK

ACK: Alibaba Cloud Container Service Kubernetes Edition, abbreviated as ACK.

EMR: Alibaba Cloud open source big data platform E-MapReduce, abbreviated as EMR.

On Alibaba Cloud Public Cloud, we have an EMR on ACK product that includes Spark-type clusters, followed by Spark on ACK. Spark on ACK This product is a set of semi-managed big data platform, users first need to have their own ACK cluster, that is, k8s cluster, and then we will create a namespace for Spark jobs in this cluster, and install some fixed components pods such as spark-operator, historyserver, etc., and subsequent Spark job pods will also run under this namespace. These Spark job pods can be run using the user’s own ACK node machine, or they can use our elastic instance ECI to run, to achieve pay-as-you-go. What is this so-called elastic instance ECI, let’s introduce it in detail.

2. Elastic benefits on the cloud

Spark’s biggest advantage in the cloud is better elasticity, in the environment of Alibaba Cloud’s ACK, it provides an elastic container instance ECI product, with ECI means that when we apply for a pod, we no longer occupy the resources of our own machine nodes, but fully utilize the resources on the cloud to create pods, and can be quickly pulled up, seconds to pay. Using ECI to run spark jobs I think is very cost-effective, because usually we use spark jobs to run batch tasks, early morning peak, daytime may only have a small number of queries, this peak valley obvious characteristics with fast elasticity and pay-as-you-go is very suitable, plus ECI can use spot preemptive instances, there is a 1-hour protection period, and combined with Spark’s Node decommissioning characteristics, you can save a lot of costs.

3.RSS Optimize Shuffle and dynamic resources

Spark Shuffle relies heavily on local storage, but in the cloud environment, it is difficult for the storage-separated machine to guarantee its own local disk, and the size of the cloud disk cannot be predicted, and the cost performance is not high. On the other hand, Spark’s native dynamic resource allocation without ESS, executor’s release resource efficiency is low, which may cause waste of resources due to the inability to recycle.

Spark Shuffle itself also has a number of drawbacks. The output of Mapper increases, causing the spill to be transferred to the local disk, causing additional IO; Reducer concurrently pulls the data of the Mapper side, resulting in the generation of a large number of random reads, reducing efficiency; In the shuffle process, the generation of numMapper * numReduce network connection, consuming too much CPU resources, bringing performance and stability problems; When a single copy of Shuffle data results in data loss, it needs to be recalculated, wasting resources.

Alibaba Cloud provides an independently deployed RSS, which is currently open source on github and can be directly docked to ACK, and users do not need to pay attention to whether Shuffle data has local disk support. The original spark shuffle data is stored on executor local disk, and after using RSS, the shuffle data is handed over to RSS for management. In fact, the external shuffle service industry that uses push base is already a consensus, and many companies are doing optimization in this regard. There are many advantages, Executor can be recycled after execution, saving resources; RSS also optimizes the traditional large number of random reads into appended writes and sequential reads, further making up for the efficiency problems of Spark Shuffle; RSS service supports HA deployment, multi-copy mode, reduces the possibility of double computing, and further ensures the efficiency of Spark tasks.

4. Enhanced K8s job-level scheduling

K8s’s default scheduler scheduling granularity is Pod, but the traditional Spark task scheduling default granularity is application. The startup of an application will accompany the launch of multiple pods to perform support. Therefore, when a large number of Spark tasks are suddenly submitted, a large number of Driver Pods may start, all waiting for the Executor Pod to start, resulting in a deadlock of the entire cluster. On the other hand, K8s’s multi-tenant scenarios are not well supported, and they do not support flexible scheduling between tenants, as well as dynamic quotas. Compared with the scheduling policy of YARN, K8s has a single scheduling policy, which is the default priority + FIFO method, and cannot achieve fair scheduling.

Alibaba Cloud ACK has been enhanced in this regard:

When scheduling, priority is given to determining whether the resource is satisfied, and solving the above possible deadlock problems.

Based on NameSpace, multi-tenant tree queues are implemented, and queues can set upper and lower limits on resources to support preemption of resources between queues.

Implements the priority queue of Spark jobs to be dispatched with App granularity, and supports fairness between queues. Scheduled, and based on the Spark-on-K8s-operator extension, submitted jobs automatically enter the queue.

5. Data lake storage and acceleration on the cloud

In the K8s environment, the use of data lake storage OSS is more suitable for the memory-computer separation architecture than the traditional Hadoop cluster. Spark on ACK has a built-in Jindo SDK to seamlessly connect with OSS.

Fluid can support cache acceleration in Spark on K8s deployment mode, which can increase the running speed by about 30% in TPC-DS scenarios.

6. Build a data lake on the cloud using DLF

Components of the Hadoop ecosystem that you want to use on K8s also require additional deployment. However, Spark on ACK seamlessly docks with Alibaba Cloud DLF (Data Lake Formation), DLF provides a unified metadata service, supports permission control and auditing, and provides data into the lake function, supports Spark SQL interactive analysis, and data lake management functions, supporting storage analysis and cost optimization.

7. Ease of use improvement

Spark on ACK provides a CLI tool that can submit spark jobs directly in the spark-submit syntax, and will also be recorded in the spark-operator for management. Before we mentioned the advantages and disadvantages of 2 kinds of submission jobs, spark-operator has a relatively good job management ability, but the submission job is not compatible with the old command syntax, nor can it run interactive shell, the user change from the old cluster migration is more troublesome, so using our tool, you can enjoy the advantages of 2 kinds of submission methods at the same time, which is a relatively big improvement for the user’s ease of use.

At this point of log collection, Spark on ACK provides a log collection solution and allows users to view it on the interface like Spark on YARN through HistoryServer.

This concludes today’s sharing, thank you.

Share at the end of the article, like, watch, give a 3 combo ~

01/ Sharing guests

Fan Youlun

Alibaba Cloud

Technical expert of Open Source Big Data Department

Responsible for Alibaba Cloud EMR Spark on ACK product function research and development.

02/ Free download materials

03/ Register to watch live PPT for free

04/About us

DataFun: Focus on the sharing and exchange of big data and artificial intelligence technology applications. Founded in 2017, more than 100+ offline and 100+ online salons, forums and summits have been held in Beijing, Shanghai, Shenzhen, Hangzhou and other cities, and more than 2,000 experts and scholars have been invited to participate in sharing. Its public account DataFunTalk has produced 800+ original articles, millions + reads, and 150,000+ accurate fans.

🧐 Share, like, watch, give a 3 combo! 👇