Click on the top ofzhisheng” to follow, star or pin to grow together

Flink goes from beginner to proficient Series of articles

Source: https://blog.csdn.net/hjw199089/article/details/77938688

job , stage, task

  • Worker Node: physical node on which executor: worker is executed
  • Node is a process started by an application, and the
  • triggering of multiple tasks Jobs:action generates a job, which is submitted to DAGScheduler and decomposed into Stage
  • Stage: DAGScheduler divides jobs into different stages based on shuffle, with multiple tasks in the same stage having the same shuffle dependencies. There are two types of
  • shuffle map stages and result stages:

      > shuffle map stage: case its tasks’ results are input for other stage(s).
    • Result stage:case its tasks directly compute a Spark action (e.g. count(), save(), etc) by running a function on an RDD
  • Task: A unit of work that is sent to an executor, which is simply a single data processing process on a data partition. Summary:

Action triggers a job (task corresponds to the data processing process on a partition

)——

stage1 (multiple tasks have the same shuffle dependency)—— [map–shuffle]——- stage2—- [ result–shuffle】—–

【Example Description】

The specific execution process has been analyzed in the following example, and for the purpose of analysis, map and reduce are not written as chains
The data .txt data is as follows, which represents the event uuid pv

1 1001 01 1001 11 1002 01 1003 12 1002 12 1003

12 1003 03 1001 03 1001 0

Calculate the uv and pv for each event, i.e. the implementation, under the event dimension, count(distinct if(pv > 0,uuid,null)), sum(pv)

import org.apache.spark.{ SparkConfSparkContext

/**  * Created by hjw on 17/9/11.  *

**//

*

Event UUID PV Data.txt Data 1 1001 01 1001 11 1002 01 1003 12 1002 12 1003
12 1003 03 1001 03 1001 0 Compute event,count(distinct if(pv > 0,uuid, null)) ,sum(pv)2 UV=2 PV=23 UV=0 PV=01 UV=2 PV=2 PV=2 For analysis, map and reduce are not written as chained

*/


object DAG {

def main (args: Array[String]) {


    val conf = new SparkConf() 

    conf.setAppName("test" )


    conf.setMaster("local"

    val sc = new SparkContext(conf)

 

    val txtFile = sc.textFile( ".xxxxxx/DAG/srcFile/data.txt")


    val inputRDD = txtFile.map(x => (x.split("\t")(0), x.split("\t")(1 ), x.split("\t")(2).toInt))
val partitionsSzie = inputRDD.partitions.size

Here in order to analyze the number of tasks, partitions.size = before partitioning 1, the number of tasks for each stage below is 1


val inputPartionRDD = inputRDD.repartition(2)

------map_shuffle stage has shuffle Read


    Result: (event-user, pv).
val eventUser2PV = inputPartionRDD.map(x = > (x._1 + "-" + x._2, x._3))

Result: (event, (user, pv))


val PvRDDTemp1  = eventUser2PV.reduceByKey(_ + _).map(x =>
      (x._1.split("-")(0), (x._1.split("-")(1), x._2))    ) 

     -------map_shuffle stage has shuffle Read and shuffle Write


result: (event, Tuple2(user, whether it appears), the user's pv)
val PvUvRDDTemp2PvRDDTemp1.map(
      x => x match {
        case x if x._2._2 > 0 => (x._1, (1, x._2._2))
        case x if x._2._2 == 0 = > (x._1, (0, x._2._2)) } )

Result: (event, Tuple2(uv,pv))


val PVUVRDD = PvUvRDDTemp2.reduceByKey( (a, b) => (a._1 + b._1, a._2 + b._2) )

------result_shuffle stage has shuffle Read


-------- trigger a job
val res = PVUVRDD.collect();  

------result_shuffle stage has shuffle Read


-------- trigger a job
PVUVRDD.foreach(a => println(a._1 + "\t UV=" + a._ 2._1 + "\t PV=" + a._2._2))
    //    2  UV=2  PV=2
    //    3  UV=0  PV=0
    //    1  UV=2  PV=2
    while (true ) {      ;    } sc.stop() }}

The

key analysis points selected in the log are as follows

:

17/09/11 22:46:19 INFO SparkContextRunning Spark version 1.6. 
17/09/11 22:46:21 INFO SparkUIStarted  SparkUI at http://192.168.2.100:4040 

17/09/11 22:46:21 INFO  FileInputFormatTotal input paths to process : 1


17/09/11 22: 46:22 INFO SparkContextStarting job: collect at DAG.scala:69
17/ 09/11 22:46:22 INFO DAGSchedulerRegistering RDD 3  (repartition at DAG.scala:42)
17/09/11 22:46:22  INFO DAGSchedulerRegistering RDD 7 (map at DAG.scala:46)
17/09 /11 22:46:22 INFO DAGSchedulerRegistering RDD 10  (map at DAG.scala:55

17/09/11 22:46:22 INFO DAGSchedulerGot job 0 (collect at DAG.scala:69with 2 output partitions


17/09/11 22:46:22 INFO DAGSchedulerFinal stage: ResultStage 3 (collect at DAG.scala:69)  

17/09/11 22:46 :22 INFO DAGSchedulerSubmitting ShuffleMapStage 0 (MapPartitionsRDD[ 3] at repartition at DAG.scala:42), which has no missing parents


17/09/11 22: 46:22 INFO DAGSchedulerShuffleMapStage 0 (repartition at DAG.scala:42 ) finished in 0.106 s
17/09/11 22:46:22 INFO  DAGScheduler: waiting: Set(ShuffleMapStage 1ShuffleMapStage 2ResultStage 3

17/09/11 22:46:22 INFO DAGSchedulerSubmitting  ShuffleMapStage 1 (MapPartitionsRDD[7] at map at DAG.scala:46), which has no missing parents


17/09/11 22:46:22 INFO DAGSchedulerSubmitting  2 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[7] at map at DAG.scala:46 )
17/09/11 22:46:22 INFO DAGScheduler ShuffleMapStage 1 (map at DAG.scala:46) finished in 0.055 s 

17/09/11  22:46:22 INFO DAGScheduler: waiting: Set(ShuffleMapStage 2 ResultStage 3)


17/09/11 22:46:22 INFO  DAGSchedulerSubmitting ShuffleMapStage 2 (MapPartitionsRDD[10] at map at DAG.scala: 55), which has no missing parents
17/09/11 22:46:22 INFO  DAGSchedulerShuffleMapStage 2 (map at DAG.scala:55) finished in 0.023 s 

17/09 /11 22:46:22 INFO DAGSchedulerSubmitting ResultStage  3 (ShuffledRDD[11] at reduceByKey at DAG.scala:63), which has no missing parents


17/09 /11 22:46:22 INFO DAGSchedulerResultStage 3 (collect at DAG.scala:69) finished in 0.009 s
17/09/11 22:46:22 INFO DAGSchedulerJob 0 finished: collect at DAG.scala:69, took 0.283076 s  

17/09/11 22:46:22 INFO SparkContextStarting job: foreach at DAG.scala:74


17/09/11 22:46:22 INFO  DAGSchedulerGot job 1 (foreach at DAG.scala:74with 2 output partitions 

17 /09/11 22:46:22 INFO DAGSchedulerFinal stage:  ResultStage 7 (foreach at DAG.scala:74)


17/09/11 22:46 :22 INFO DAGSchedulerParents of final stage: List(ShuffleMapStage 6 )
17/09/11 22:46:22 INFO DAGSchedulerSubmitting  ResultStage 7 (ShuffledRDD[11] at reduceByKey at DAG.scala:63), which has no missing parents
17/09/11 22:46:22 INFO DAGSchedulerResultStage 7 (foreach at DAG.scala:74) finished in 0.010 s
17/09/11 22: 46:22 INFO DAGSchedulerJob 1 finished: foreach at DAG.scala:74, took  0.028036 s

line 69 collect()

and line 74 lines foreach() trigger two job

val res = PVUVRDD.collect(); 
PVUVRDD.foreach(a => println(a._1 + "\t UV=" + a._2._1 + "\t PV=" + a._2._2))

based on the figure below

job0 has 4 stages, a total of 7 tasks

  • stage0: 1 partition has 1 task execution
  • stage1–3: each has 2 partitions, each

  • has 2 tasks executed, a total of 6 tasks

job1 has 1 stage (reuses the RDD of job0) and has 2 tasks

the details of specific job0

specific job0-stage0

1 1001 01 1001 11 1002 01 1003 12 1002 12 1003 12 1003

03 1001 03 1001 0 Enter 9 records, as long as there is a partition, there is a task execution, want to repart shuffle write data

Perform

procedural analysis:

Action triggers the Job,

such as val res = PVUVRDD.collect() from (1), to start the reverse analysis of the job execution process Call –dagScheduler.runJob() in Action with SparkContext runJob() call –dagScheduler.runJob(rdd, func, number of partitions, Other) call submitJob() in the runJob of DAGScheduler and return the listener waiter, listen to the job status runJob() internal val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties), Listen to the status of the waiter (there is an atomic jobId), that is, whether the job completes submitJob() The internal return value waiter is JobWaiter(JobListener)val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) will get the job (already has a JobId), Inserted into the event processing queue of the LinkedBlockingDeque structure eventProcessLoopeventProcessLoop.post (JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone( properties)))eventProcessLoop = new DAGSchedulerEventProcessLoop(this)class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)extends  EventLoopEventLoop is LinkedBlockingDequeeventProcessLoop (LinkedBlockingDeque type) put in a new event, call up the underlying DAGSchedulerEventProcessLoop.onReceive(), execute doOnReceive()doOnReceive(event: DAGSchedulerEvent) internally calls specific Submitted handle functions to submit specific Jobs such as event cases based on the specific type of DAGSchedulerEvent

, such as JobSubmitted events or MapStageSubmitted events  JobSubmitted=>dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>

dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite , listener, properties)

case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>

dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)handleJobSubmitted() internal return from ResultStage Build stage Create finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)finalStageActivate Job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties), while starting to reverse build the missing stage, getMissingParentStages(finalStage) (To be added) DAG is built, stage, submitStage(finalStage)submitStage, stage commit as tasks, submitMissingTasks()submitMissingTasks, according to ShuffleMapStage or ResultStage--new ShuffleMapTask or ResultTasktaskScheduler.submitTasks() starts to bring up the specific taskprivate def submitStage(stage: Stage) {val missing = getMissingParentStages(stage).sortBy(_.id)//==== Start the reverse division here, see below

logDebug("missing: " + missing)


if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get)private def getMissingParentStages(stage: Stage): List[Stage] ={}

taskScheduler.submitTasks To be continued…

Reverse analysis of the working principle of TaskScheduler

from JobDAGScheduler commits the stage to the task, and then the stage reverses the execution of the stage’s middle rdd, these rdds are packaged into tasksets, that is, each partition is a task, and the corresponding function function is executed on the rdd.

The details are as follows:

Continuing from the last part of the previous section, submit Stage-submitMissingTasks(stage, jobId.get) in DAGScheduler is as follows, submitting each stage

 private def submitStage(stage: Stage) {val missing = getMissingParentStages(stage).sortBy(_.id)//==== Start the reverse division here, see submitMissingTasks() below for the specific division, according to the type of stage, call different stageStart overload types 

Buy Me A Coffee