Click to follow “Like Coder”
Get more technical dry goods~~
Department: Data middle office
With the growth of data warehouse data volume, data lineage or data Provence is increasingly important for data analysis, through the data lineage can be traced to the upstream and downstream relationships of table-table, table-task, task-task, to support the problem data traceability, islanding data offline needs.
At present, the bloodline resolution of SQL tasks has been supported based on ANTLR syntax analysis, and the bloodline of Spark App tasks is still through manual configuration. We hope to be able to supplement the analysis of Spark App tasks and improve the bloodline logic.
At present, the online Spark App tasks support Spark 2.3, Spark 3.1 two versions, and support python2/3, java, scala type, running platform respectively support yarn and k8s, bloodline collection mechanism needs to consider the adaptation of all the above tasks.
2. Design ideas
The parsing ideas for Spark App tasks usually have the following three categories:
Based on code analysis: through the parsing of the logic of the Spark App to achieve the purpose of blood analysis, similar products are SPROV.
Based on dynamic listening: Titian  and Pebble  who modify the code to collect bloodlines at runtime, or Spline  and Apache Atlas  who collect bloodlines at runtime through plug-ins.
Log-based parsing: By analyzing event log information such as Spark App, and then resolving the lineage of the task.
Because Spark App is written in a variety of ways, code-based parsing needs to consider java, python, scala, which is too complicated, we first consider log-based analysis. By analyzing the history of the task event log of spark3 and spark2, it is found that the event log of spark2 does not have the complete metadata information related to the hive table, while spark3 prints out the hive table element information on the basis of various read operators such as FileSourceScanExec and HiveTableScan, so the event log method cannot support spark2 perfectly.
Based on this, we finally adopted a dynamic listening-based approach, and researched spline, and conducted usability analysis. The following describes the use and design principles of spline.
3. Spline-based lineage analysis scheme
spline (Spark Lineage) is a free and open source Spark lineage collection system based on the Apache 2.0 protocol. The system is mainly divided into three parts: spline agent, spline server and spline ui.
Here is mainly to introduce the principle of spline agent, because this is the part responsible for bloodline analysis, as for the spline server and ui is responsible for the collection and display of bloodlines, you can replace it with the internal system.
The general architecture diagram is shown in the following figure:
spline supports two initialization methods, codeless and programmatic. Essentially registering a QueryExecutionListener to listen to SparkListenerSQLExecutionEnd messages.
Codeless init is a configuration that embeds the user’s Spark APP without modifying the code. By registering a QueryExecutionListener listener, you can receive and process Spark messages. Launch configurations such as:
The spline agent jar package address is executed by –jars, or it can be placed in the jars directory of the spark deployment by default.
Specify the spline properties file via –files, or specify the configuration item directly through –conf, which requires the addition of spark. Prefix.
Listeners can be registered by –conf “spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener”.
The programmatic init needs to open bloodline resolution as shown in the code, for example
The bloodline resolution logic is in the SplineAgent.handle() method, by calling LineageHarvester.harvest(), to obtain the final bloodline, and give it to the LineageDispatcher to output the result.
The message QueryExecution can be obtained through the SparkListenerSQLExecutionEnd message, and the bloodline resolution is based on the analyzed logical plan and executedPlan in QueryExecution, and the LineageHarvester.harvest() logic is handled as follows:
tryExtractWriteCommand (logicalPlan) is responsible for parsing the write operations in the logicalPlan, and the resolution of the write operations relies on the plug-in method.
By getting the plugin of the WriteNodeProcessing type in the PluginRegistry, getting the write operation in the logicalPlan, and by parsing the specific Command, you can get the table name information of the hive table, for example. The final information is encapsulated as a WriteCommand data structure.
For example, DataSourceV2Plugin.writeNodeProcessor() will be responsible for the parsing of the V2WriteCommand, CreateTableAsSelect, and ReplaceTableAsSelect commands.
The parsing plugin can be extended by itself, enriching the data source parsed by spline, the plugin needs to inherit za.co.absa.spline.harvester.plugin.Plugin, spline agent will automatically load all plugins in classpath at startup.
After parsing to writeCommand, the read operation is parsed based on the query field in writeCommand. Read operations are recursively resolved based on the logicalPlan part of query.
Finally, after the parsing is completed, you can get the two json information of plan and event, plan is the blood relationship, and event is additional auxiliary information.
The LineageDispatcher decides how the bloodline is sent, and the built-in Dispatcher implementation is self-explanatory. For example, HttpLineageDispatcher is to send the bloodline to an HTTP interface, KafkaLineageDispatcher is sent to a Kafka topic, and LoggingLineageDispatcher is to print the bloodline in the stderr log of the Spark APP for debugging confirmation.
If you want to customize the dispatcher, you can inherit the LineageDispatcher yourself, and provide a constructor with the input parameter org.apache.commons.configuration.Configuration as follows:
The post processing filter can be post-processed before being handed over to the dispatcher after the bloodline resolution is complete, such as desensitization. Implementing a filter requires the implementation of za.co.absa.spline.harvester.postprocessing.PostProcessingFilter, which accepts an input parameter of type org.apache.commons.configuration.Configuration. The configuration is as follows:
pyspark 2.3 If you want to support codeless init, you need to hit a patch SPARK-23228, related questions can refer to this ISSUE ( https://github.com/AbsaOSS/spline-spark-agent/issues/490 ).
Compile the spline-agent jar package corresponding to the spark and scala versions.
Such as spark 3.1
Deploy the spark agent jar package in the /path/to/spark/jars directory.
To integrate with the existing system, modify the code appropriately, add the workflow or task name corresponding to the Spark APP in the final event message, send the bloodline and task information to the custom HTTP server, parse the bloodline report kafka, and unify consumption processing.
The spline server side can be started with one click via docker-compose. Visit the spline ui to see the resolved bloodline.
Currently, spline agents have some bloodline scenarios that cannot be handled, as follows:
The source logic in the RDD cannot be resolved, and if the dataframe is converted to an RDD for operation, the lineage after this cannot be traced. This is related to the recursion of spline parsing through the child relationship in logicalPlan, and encountering the end of LogicalRDD recursion. A possible solution is to find the lineage information by resolving the dependency relationship of the RDD when encountering the LogicalRDD operator.
2. Bloodline resolution is based on write triggering, so if the task only does queries, it will not parse the bloodline
Although there are still some shortcomings, but the spline agent can unconsciously increase the bloodline analysis ability for the Spark APP program running online, which is a very good idea, and the follow-up can be based on this direction to further study to optimize the bloodline accuracy of the Spark APP.
Collecting and visualizing data lineage of Spark jobs