1.背景:

血缘关系非常重要,因为有了字段间的血缘关系,便可以知道数据的来源去处,以及字段之间的转换关系,这样对数据的质量,治理有很大的帮助。

Spark SQL 相对于 Hive 来说通常情况下效率会比较高,对于运行时间、资源的使用上面等都会有较大的收益。所以考虑将采用MapReduce引擎执行的sql进行迭代,以spark引擎执行。但同时也需要实现字段血缘的功能。hive血缘关系实现较为简单,攻略也比较多,这spark血缘关系攻略较少,这里提供一种解析思路。

2.需求:

在使用spark引擎执行sql时,将表与表,字段与字段的血缘信息解析出来,可视化展示。

3.思路:

使用QueryExecutionListener对spark进行监听,读取出sparkplan(物理计划),解析其中包含的血缘关系,将血缘关系导入neo4j,spring-boot写接口,前端请求返回表的血缘关系。

4.实现:

QueryExecutionListener:监听和用于分析spark-sql执行过程中的的一些指标

The interface of query execution listener that can be used to analyze execution metrics.

trait QueryExecutionListener {@DeveloperApidef onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit@DeveloperApidef onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit
}
class SparkListenerTest extends QueryExecutionListener{override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {val sparkPlanJson: String = qe.sparkPlan.prettyJson}
}

了解一下整个sql在spark中的解析过程

sql —(ANTLR4) —> AST —(Spark AstBuilder) —> Unresolved LogicalPlan —  (Catalog) —>  Resolved LogicalPlan  — (Optimizer) —> Optimized LogicalPlan — (SparkPlanner) —> PhysicalPlan(SparkPlan) —(prepareForExecution) —> ExecutionPlan(PhysicalPlan)

QueryExecution可以获取到的信息如下:

logicalPlan: 逻辑计划并不知道如何执行,比如不知道表的类型(hive还是hbase),如何获取数据(jdbc还是读取hdfs),数据分布是什么样的(bucketed还是hashDistrobuted),这时就需要将逻辑计划树转换成物理计划树,获取真实的物理属性

sparkPlan:就是刚刚提到的物理计划树,这里我们监听获取sparkplan的Json信息

这段JSON获取目标表的所有信息

table和database

第一个Project包含了目标表的信息和血缘信息(下面说),其中projectList就是表中的字段信息,第一个Project就是insert into xxx 的 xxx(目标表target)

spark会给每个字段打上一个唯一自增的id(logicalPlan打上的),我们将信息存放在map里(id -> database.table.column)

这里很重要,因为就是使用id将字段和字段之间关联的

LogicalRelation包含了源头表的所有信息,output中包含了字段信息,catalogTable中包含了表名和库名(多个源头表就会有多个LogicalRelation)

字段信息和id 

表名和库名

上面说的第一个Project非常重要,因为第一个Project不仅仅包含了目标表的字段信息,还包含了这些字段来自哪个字段,去刚刚存放的map里匹配,将结果放入新map(sourcecolumn -> targetcolumn)

代码:

class SparkListenerTest extends QueryExecutionListener{override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {//    println(qe.analyzed.prettyJson)//    println("-------------------------------")//    println(qe.logical.prettyJson)//    println("-------------------------------")//    println(qe.optimizedPlan.prettyJson)//    println("-------------------------------")//    println(qe.sparkPlan.prettyJson)//    println("-------------------------------")//    println(qe.executedPlan.prettyJson)val sparkPlanJson: String = qe.sparkPlan.prettyJson//获取数据写入的相关的json信息,其中可以将血缘关系解析出来val dataWritingCommandExec = JSON.parseArray(sparkPlanJson).get(0)//id与字段的对应关系val allTableInfo = new util.HashMap[String, String]()//目标表var targetDatabaseTable = ""//源头表val sourceDatabaseTables = new util.ArrayList[String]()//目标表与源头表中字段与字段的对应关系val allColumnRelation = new util.HashMap[String, String]()//判断一下是否为insert语句,不是的话不需要解析if(JSON.parseObject(dataWritingCommandExec.toString).getString("class").contains("DataWritingCommandExec")) {val cmd = JSON.parseObject(dataWritingCommandExec.toString).getJSONArray("cmd").toArray()for (c <- cmd) {//这里只写了目标表为hive表的情况 获取目标表if (JSON.parseObject(c.toString).getString("class").contains("InsertIntoHiveTable")) {//table的所有信息val tableInfo = JSON.parseObject(c.toString).getString("table")val identifier = JSON.parseObject(tableInfo).getString("identifier")//table nameval table = JSON.parseObject(identifier).getString("table")//databaseval database = JSON.parseObject(identifier).getString("database")targetDatabaseTable = database + "." + table  //目标表
//          println(targetDatabaseTable)}//获取源头表if (JSON.parseObject(c.toString).getString("class").contains("LogicalRelation")) {//catalogval catalogTable = JSON.parseObject(c.toString).getString("catalogTable")val identifier = JSON.parseObject(catalogTable).getString("identifier")//table nameval table = JSON.parseObject(identifier).getString("table")//databaseval database = JSON.parseObject(identifier).getString("database")//将源头表加入list中sourceDatabaseTables.add(database + "." + table)val output = JSON.parseObject(c.toString).getJSONArray("output").toArray()//获取字段信息for (o <- output) {val detail = JSON.parseObject(JSON.parseArray(o.toString).get(0).toString)//clomun nameval column = (database + "." + table) + "." + detail.getString("name")//唯一id信息val columnId = JSON.parseObject(detail.getString("exprId")).getString("id")allTableInfo.put(columnId, column)}}}
//      println("allTableInfo:" + allTableInfo)//相当于java的 break 因为我们只需要第一个Project的信息,只有第一个Project的中才包含血缘信息;var loop = new Breaks;loop.breakable {for (c <- cmd) {if (JSON.parseObject(c.toString).getString("class").contains("logical.Project")) {val projectList = JSON.parseObject(c.toString).getJSONArray("projectList").toArray()for (p <- projectList) {val project = JSON.parseArray(p.toString)val length = project.size()//目标表和源头表字段名字不一样(roleId -> role_id)if (length > 1) {//获取目标表的字段名val targetColumn = targetDatabaseTable + "." + JSON.parseObject(project.get(0).toString).get("name")
//                println(targetColumn)for (p <- project.toArray()) {if (JSON.parseObject(p.toString).getString("class").contains("AttributeReference")) {//获取与目标表字段对应的源头表字段的字段id,通过字段id获取源头表字段val sourceColumn = allTableInfo.get(JSON.parseObject(JSON.parseObject(p.toString).getString("exprId")).getString("id"))if (!allColumnRelation.containsKey(sourceColumn)) {//放入字段关系map中allColumnRelation.put(sourceColumn, targetColumn)}}} //目标表和源头表字段名字一样(role_id -> role_id)} else {val targetColumn = targetDatabaseTable + "." + JSON.parseObject(project.get(0).toString).get("name")val sourceColumn = allTableInfo.get(JSON.parseObject(JSON.parseObject(project.get(0).toString).getString("exprId")).getString("id"))allColumnRelation.put(sourceColumn, targetColumn)}}loop.break()}}}
//      println("allColumnRelation:" + allColumnRelation)//      println("targetDatabaseTable:" + targetDatabaseTable)//      println("sourceDatabaseTables" + sourceDatabaseTables)}//下面就是 neo4j的语句拼接了 将血缘信息导入到图数据库
if(!targetDatabaseTable.equals("")) {val session: Session = SparkListenerTest.driver.session//创建目标表if (session.run(s"match (t:Table {name:'${targetDatabaseTable}'}) return t").list().size() == 0) {session.run(s"CREATE (n:Table {name:'${targetDatabaseTable}'}) RETURN n")println(s"CREATE (n:Table {name:'${targetDatabaseTable}'}) RETURN n")//        println(s"CREATE (n:Table {name:'${TargetTable}'}) RETURN n")}//创建源头表并创建目标表表与源头表的关系for(sourceDatabaseTable <- sourceDatabaseTables) {if (session.run(s"match (t:Table {name:'${sourceDatabaseTable}'}) return t").list().size() == 0) {session.run(s"CREATE (n:Table {name:'${sourceDatabaseTable}'}) RETURN n")println(s"CREATE (n:Table {name:'${sourceDatabaseTable}'}) RETURN n")}session.run(s"MATCH (a:Table {name:'${sourceDatabaseTable}'}),(b:Table {name:'${targetDatabaseTable}'}) MERGE (a)-[:Derived]->(b)")println(s"MATCH (a:Table {name:'${sourceDatabaseTable}'}),(b:Table {name:'${targetDatabaseTable}'}) MERGE (a)-[:Derived]->(b)")}//判断是否有目标表的列节点,如果没有则创建for(targetColumn <- allColumnRelation.values()) {if (session.run(s"match (c:Column {name:'${targetColumn}'}) return c").list().size() == 0) {session.run(s"CREATE (n:Column {name:'${targetColumn}'}) RETURN n")//        println(s"CREATE (n:Column {name:'${TargetColumn}'}) RETURN n")}val targeTable = targetColumn.split("\\.")(0)+"."+targetColumn.split("\\.")(1)session.run(s"MATCH (a:Column {name:'${targetColumn}'}),(b:Table {name:'${targeTable}'}) MERGE (a)-[:Belongs]->(b)")println(s"MATCH (a:Column {name:'${targetColumn}'}),(b:Table {name:'${targeTable}'}) MERGE (a)-[:Belongs]->(b)")}//判断是否有源头表的列节点,如果没有则创建并创建列关系for(sourceColumn <- allColumnRelation.keySet()) {if (session.run(s"match (c:Column {name:'${sourceColumn}'}) return c").list().size() == 0) {session.run(s"CREATE (n:Column {name:'${sourceColumn}'}) RETURN n")//        println(s"CREATE (n:Column {name:'${TargetColumn}'}) RETURN n")}val sourceTable = sourceColumn.split("\\.")(0)+"."+sourceColumn.split("\\.")(1)session.run(s"MATCH (a:Column {name:'${sourceColumn}'}),(b:Table {name:'${sourceTable}'}) MERGE (a)-[:Belongs]->(b)")println(s"MATCH (a:Column {name:'${sourceColumn}'}),(b:Table {name:'${sourceTable}'}) MERGE (a)-[:Belongs]->(b)")session.run(s"MATCH (a:Column {name:'${sourceColumn}'}),(b:Column {name:'${allColumnRelation.get(sourceColumn)}'}) MERGE (a)-[:Derived_column]->(b)")println(s"MATCH (a:Column {name:'${sourceColumn}'}),(b:Column {name:'${allColumnRelation.get(sourceColumn)}'}) MERGE (a)-[:Derived_column]->(b)")}}}
}//创建伴生类,可以理解为java的静态变量,同一个session只需要建立一次driver,不需要每个sql都建立一次
object SparkListenerTest{val driver = GraphDatabase.driver(url, AuthTokens.basic(database, password))
}

提交的时候要修改配置文件

spark-submit

--master xxx --deploy xxx

--executor-cores xxx --executor-memory xxx --num-executor xxx

--conf spark.sql.queryExecutionListeners="xxx.SparkListenerTest"

或者直接修改spark的conf文件 spark.sql.queryExecutionListeners= "xxx.SparkListenerTest"

导入neo4j后的效果 neo4j browser

spark-sql字段级血缘关系实现相关推荐

  1. hive字段级血缘关系实现

    1.数据血缘的概念 https://zhuanlan.zhihu.com/p/133953825 就放一张效果图,可以理解为有向无环图,对于图中的Table X,还可以画出其右半边,即Table X作 ...

  2. mysql血缘 表级血缘 字段级血缘GUDU GSP,JSQL PARSER,ANTLR MYSQL,DRUID

    ** 目的: ** 分析mysql的表级和字段级血缘,本文给出他人源码或示例 工具 GUDU-SQLPARSER GSP JSQL-PARSER antlr DRUID 横向对比 名称 开源 功能 优 ...

  3. 使用马哈鱼SQLFLow 1 分钟获取复杂 SQL 语句的血缘关系

    数据血缘关系在企业的数据治理中是非常重要的一个环节,关于数据血缘在企业数据治理中的重要作用,可以参考这篇文章.SQL 语言在数据处理中被广泛使用,SQL 语句中包含了丰富的数据血缘关系,关于什么是 S ...

  4. 搜狐 Hive SQL 血缘关系解析与应用

    1. 研究背景 随着企业信息化和业务的发展,数据资产日益庞大,数据仓库构建越来越复杂,在数仓构建的过程中,常遇到数据溯源困难,数据模型修改导致业务分析困难等难题,此类问题主要是由于数据血缘分析不足造成 ...

  5. 【文末有惊喜!】Hive SQL血缘关系解析与应用

    本文字数:7860字 预计阅读时间:20分钟 + 1 研究背景 随着企业信息化和业务的发展,数据资产日益庞大,数据仓库构建越来越复杂,在数仓构建的过程中,常遇到数据溯源困难,数据模型修改导致业务分析困 ...

  6. 通过SQL一键解析表字段血缘关系

    可以根据sql文件中的sql快速解析出表字段到字段的血缘关系,支持mysql.oracle.hive.impala.greenplum等数据库sql语法解析,可以在sql文件中放如多段sql解析,也可 ...

  7. 聊聊Hive数据血缘——从Atlas没有列级血缘的Bug讲起

    正文共: 9053字 12图 预计阅读时间: 23分钟 前几天,Datahub提供了最新的字段级别数据血缘功能,很多朋友迫不及待想对比一下Datahub的字段级血缘与Atlas的区别. 这个时候问题来 ...

  8. Spark入门(五)Spark SQL shell启动方式(元数据存储在derby)

    一.spark-sql shell介绍 Spark sql是以hive SQL提交spark任务到spark集群执行. 由于spark是计算框架没有存储功能,所有spark sql数据表映射关系存储在 ...

  9. 解析SQL的表间血缘关系工具

    一.sqllineage SQL Lineage Analysis Tool powered by Python 源码地址:https://github.com/reata/sqllineage 安装 ...

  10. 推荐两则基于解析SQL的表间血缘关系工具

    推荐两则基于解析SQL的表间血缘关系工具 基于大数据脚本,可以用Apache Atlas做元数据的血缘关系分析,很多金融机构仍然在使用老的遗留系统,比如老版本的Hive.数据库存储过程,如何反查表间的 ...

最新文章

  1. Python入门100题 | 第034题
  2. linux 5005端口,Linux配置防火墙端口 8080端口
  3. 循序渐进DB2.DBA系统管理、运维与应用案例pdf
  4. arcgis批量处理nc文件_气象数据处理——nc文件
  5. matlab find
  6. Tensorflow实现MLP
  7. java内部类实现方式_Java内部类详解
  8. envi窗口滤波_高光谱ENVI图像处理之滤波
  9. linux通信加密软件,5个Linux加密工具:VeraCrypt,CipherShed,CryFS,GnuPG,Encfs介绍
  10. java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运行部署
  11. 变频器,变频调速操作控制,QY-TS02
  12. Hadoop之使用LZO压缩并支持分片
  13. python跳出循环的方法_Python 跳出嵌套循环的5种方法
  14. 微软学术搜索的新功能设想:用户账户系统——史经浩
  15. 关于WPS Office安全漏洞情况的通报
  16. 人脸考勤机是如何工作的?人脸考勤机有哪些优缺点?
  17. 计算机常用键盘有几个键失灵,电脑键盘忽然有几个键失灵了
  18. SpringBoot邮件发送(QQ邮箱)
  19. 刷金币全自动脚本 | 让Python每天帮你薅一个早餐钱(送源码)
  20. mipi协议中文详解

热门文章

  1. 了解卡尔曼滤波器2--最优状态估计
  2. Google Earth、百度地图、高德地图数据原来是这么来的
  3. 14.css中的定位的参照物
  4. python 协程是啥_什么是Python中的协程
  5. [zz] 三维动画软件Maya
  6. oracle exclude table,【DATAPUMP】导出时使用exclude排除表
  7. input输入效果控制onfocus和onblur事件(转)
  8. YARN学习总结-第九节-YARN-Web-App-Proxy
  9. c语言输出0.000000或乱码,深究
  10. 常用的空间数据结构(网格/四叉树/八叉树/BSP树/k-d树/BVH/自定义划分)