Click on the top of “zhisheng” to follow, star or pin to grow together
Flink goes from beginner to proficient Series of articles
Source: https://blog.csdn.net/hjw199089/article/details/77938688



job , stage, task
-
Worker Node: physical node on which executor: worker is executed -
Node is a process started by an application, and the -
triggering of multiple tasks Jobs:action generates a job, which is submitted to DAGScheduler and decomposed into Stage -
Stage: DAGScheduler divides jobs into different stages based on shuffle, with multiple tasks in the same stage having the same shuffle dependencies. There are two types of -
Result stage:case its tasks directly compute a Spark action (e.g. count(), save(), etc) by running a function on an RDD -
Task: A unit of work that is sent to an executor, which is simply a single data processing process on a data partition. Summary:
shuffle map stages and result stages:
Action triggers a job (task corresponds to the data processing process on a partition
)——
stage1 (multiple tasks have the same shuffle dependency)—— [map–shuffle]——- stage2—- [ result–shuffle】—–

【Example Description】
The specific execution process has been analyzed in the following example, and for the purpose of analysis, map and reduce are not written as chains
The data .txt data is as follows, which represents the event uuid pv
1 1001 01 1001 11 1002 01 1003 12 1002 12 1003
12 1003 03 1001 03 1001 0
Calculate the uv and pv for each event, i.e. the implementation, under the event dimension, count(distinct if(pv > 0,uuid,null)), sum(pv)
import org.apache.spark.{ SparkConf, SparkContext} /** * Created by hjw on 17/9/11. * **//
*Event UUID PV Data.txt Data 1 1001 01 1001 11 1002 01 1003 12 1002 12 1003
12 1003 03 1001 03 1001 0 Compute event,count(distinct if(pv > 0,uuid, null)) ,sum(pv)2 UV=2 PV=23 UV=0 PV=01 UV=2 PV=2 PV=2 For analysis, map and reduce are not written as chained */
object DAG { def main (args: Array[String]) {
val conf = new SparkConf() conf.setAppName("test" )
conf.setMaster("local") val sc = new SparkContext(conf)
val txtFile = sc.textFile( ".xxxxxx/DAG/srcFile/data.txt")
val inputRDD = txtFile.map(x => (x.split("\t")(0), x.split("\t")(1 ), x.split("\t")(2).toInt))
val partitionsSzie = inputRDD.partitions.size Here in order to analyze the number of tasks, partitions.size = before partitioning 1, the number of tasks for each stage below is 1
val inputPartionRDD = inputRDD.repartition(2) ------map_shuffle stage has shuffle Read
Result: (event-user, pv).
val eventUser2PV = inputPartionRDD.map(x = > (x._1 + "-" + x._2, x._3)) Result: (event, (user, pv))
val PvRDDTemp1 = eventUser2PV.reduceByKey(_ + _).map(x =>
(x._1.split("-")(0), (x._1.split("-")(1), x._2)) ) -------map_shuffle stage has shuffle Read and shuffle Write
result: (event, Tuple2(user, whether it appears), the user's pv)
val PvUvRDDTemp2 = PvRDDTemp1.map(
x => x match {
case x if x._2._2 > 0 => (x._1, (1, x._2._2))
case x if x._2._2 == 0 = > (x._1, (0, x._2._2)) } ) Result: (event, Tuple2(uv,pv))
val PVUVRDD = PvUvRDDTemp2.reduceByKey( (a, b) => (a._1 + b._1, a._2 + b._2) ) ------result_shuffle stage has shuffle Read
-------- trigger a job
val res = PVUVRDD.collect(); ------result_shuffle stage has shuffle Read
-------- trigger a job
PVUVRDD.foreach(a => println(a._1 + "\t UV=" + a._ 2._1 + "\t PV=" + a._2._2))
// 2 UV=2 PV=2
// 3 UV=0 PV=0
// 1 UV=2 PV=2
while (true ) { ; } sc.stop() }}
The
key analysis points selected in the log are as follows
:
17/09/11 22:46:19 INFO SparkContext: Running Spark version 1.6.
17/09/11 22:46:21 INFO SparkUI: Started SparkUI at http://192.168.2.100:4040 17/09/11 22:46:21 INFO FileInputFormat: Total input paths to process : 1
17/09/11 22: 46:22 INFO SparkContext: Starting job: collect at DAG.scala:69
17/ 09/11 22:46:22 INFO DAGScheduler: Registering RDD 3 (repartition at DAG.scala:42)
17/09/11 22:46:22 INFO DAGScheduler: Registering RDD 7 (map at DAG.scala:46)
17/09 /11 22:46:22 INFO DAGScheduler: Registering RDD 10 (map at DAG.scala:55) 17/09/11 22:46:22 INFO DAGScheduler: Got job 0 (collect at DAG.scala:69) with 2 output partitions
17/09/11 22:46:22 INFO DAGScheduler: Final stage: ResultStage 3 (collect at DAG.scala:69) 17/09/11 22:46 :22 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[ 3] at repartition at DAG.scala:42), which has no missing parents
17/09/11 22: 46:22 INFO DAGScheduler: ShuffleMapStage 0 (repartition at DAG.scala:42 ) finished in 0.106 s
17/09/11 22:46:22 INFO DAGScheduler: waiting: Set(ShuffleMapStage 1, ShuffleMapStage 2, ResultStage 3 ) 17/09/11 22:46:22 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[7] at map at DAG.scala:46), which has no missing parents
17/09/11 22:46:22 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[7] at map at DAG.scala:46 )
17/09/11 22:46:22 INFO DAGScheduler: ShuffleMapStage 1 (map at DAG.scala:46) finished in 0.055 s 17/09/11 22:46:22 INFO DAGScheduler: waiting: Set(ShuffleMapStage 2, ResultStage 3)
17/09/11 22:46:22 INFO DAGScheduler: Submitting ShuffleMapStage 2 (MapPartitionsRDD[10] at map at DAG.scala: 55), which has no missing parents
17/09/11 22:46:22 INFO DAGScheduler: ShuffleMapStage 2 (map at DAG.scala:55) finished in 0.023 s 17/09 /11 22:46:22 INFO DAGScheduler: Submitting ResultStage 3 (ShuffledRDD[11] at reduceByKey at DAG.scala:63), which has no missing parents
17/09 /11 22:46:22 INFO DAGScheduler: ResultStage 3 (collect at DAG.scala:69) finished in 0.009 s
17/09/11 22:46:22 INFO DAGScheduler: Job 0 finished: collect at DAG.scala:69, took 0.283076 s 17/09/11 22:46:22 INFO SparkContext: Starting job: foreach at DAG.scala:74
17/09/11 22:46:22 INFO DAGScheduler: Got job 1 (foreach at DAG.scala:74) with 2 output partitions 17 /09/11 22:46:22 INFO DAGScheduler: Final stage: ResultStage 7 (foreach at DAG.scala:74)
17/09/11 22:46 :22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 6 )
17/09/11 22:46:22 INFO DAGScheduler: Submitting ResultStage 7 (ShuffledRDD[11] at reduceByKey at DAG.scala:63), which has no missing parents
17/09/11 22:46:22 INFO DAGScheduler: ResultStage 7 (foreach at DAG.scala:74) finished in 0.010 s
17/09/11 22: 46:22 INFO DAGScheduler: Job 1 finished: foreach at DAG.scala:74, took 0.028036 s
line 69 collect()
and line 74 lines foreach() trigger two job
val res = PVUVRDD.collect();
PVUVRDD.foreach(a => println(a._1 + "\t UV=" + a._2._1 + "\t PV=" + a._2._2))

based on the figure below
job0 has 4 stages, a total of 7 tasks
-
stage0: 1 partition has 1 task execution -
has 2 tasks executed, a total of 6 tasks
stage1–3: each has 2 partitions, each
job1 has 1 stage (reuses the RDD of job0) and has 2 tasks

the details of specific job0

specific job0-stage0
1 1001 01 1001 11 1002 01 1003 12 1002 12 1003 12 1003
03 1001 03 1001 0
Enter 9 records, as long as there is a partition, there is a task execution, want to repart shuffle write data







procedural analysis:
Action triggers the Job,
such as val res = PVUVRDD.collect() from (1), to start the reverse analysis of the job execution process Call –dagScheduler.runJob() in Action with SparkContext runJob() call –dagScheduler.runJob(rdd, func, number of partitions, Other) call submitJob() in the runJob of DAGScheduler and return the listener waiter, listen to the job status runJob() internal val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties), Listen to the status of the waiter (there is an atomic jobId), that is, whether the job completes submitJob() The internal return value waiter is JobWaiter(JobListener)val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) will get the job (already has a JobId), Inserted into the event processing queue of the LinkedBlockingDeque structure eventProcessLoopeventProcessLoop.post (JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone( properties)))eventProcessLoop = new DAGSchedulerEventProcessLoop(this)class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)extends EventLoopEventLoop is LinkedBlockingDequeeventProcessLoop (LinkedBlockingDeque type) put in a new event, call up the underlying DAGSchedulerEventProcessLoop.onReceive(), execute doOnReceive()doOnReceive(event: DAGSchedulerEvent) internally calls specific Submitted handle functions to submit specific Jobs such as event cases based on the specific type of DAGSchedulerEvent, such as JobSubmitted events or MapStageSubmitted events JobSubmitted=>dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite , listener, properties)case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)handleJobSubmitted() internal return from ResultStage Build stage Create finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)finalStageActivate Job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties), while starting to reverse build the missing stage, getMissingParentStages(finalStage) (To be added) DAG is built, stage, submitStage(finalStage)submitStage, stage commit as tasks, submitMissingTasks()submitMissingTasks, according to ShuffleMapStage or ResultStage--new ShuffleMapTask or ResultTasktaskScheduler.submitTasks() starts to bring up the specific taskprivate def submitStage(stage: Stage) {val missing = getMissingParentStages(stage).sortBy(_.id)//==== Start the reverse division here, see below logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get)private def getMissingParentStages(stage: Stage): List[Stage] ={}


taskScheduler.submitTasks To be continued…
Reverse analysis of the working principle of TaskScheduler
from JobDAGScheduler commits the stage to the task, and then the stage reverses the execution of the stage’s middle rdd, these rdds are packaged into tasksets, that is, each partition is a task, and the corresponding function function is executed on the rdd.
The details are as follows:
Continuing from the last part of the previous section, submit Stage-submitMissingTasks(stage, jobId.get) in DAGScheduler is as follows, submitting each stage
private def submitStage(stage: Stage) {val missing = getMissingParentStages(stage).sortBy(_.id)//==== Start the reverse division here, see submitMissingTasks() below for the specific division, according to the type of stage, call different stageStart overload types