Spark血缘字段解析

  • 一、废话不多说,直接上代码
  • 二、把项目打成jar包,并和spark集成
  • 三、本人对该项目的改造
    • 1.项目结构
  • 四、最后的清洗结果

一、废话不多说,直接上代码

package com.roundyuan.sparkagentimport org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeserializeToObject, Distinct, Filter, Generate, GlobalLimit, InsertIntoTable, Join, LocalLimit, LocalRelation, LogicalPlan, MapElements, MapPartitions, Project, Repartition, RepartitionByExpression, SerializeFromObject, SubqueryAlias, TypedFilter, Union, Window}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
import org.apache.spark.sql.util.QueryExecutionListenerimport scala.collection.mutable
import scala.collection.mutable.{ListBuffer, Map}
import scala.util.control.NonFatal/*** auth:zhangmengyuan* 手动获取解析计划为 explain extended* 该方法为通过解析逻辑执行计划获取字段间的血缘关系**/class FlQueryExecutionListener extends QueryExecutionListener with Logging {// 目标表应该只有一张private val targetTable: Map[Long, String] = Map()// source表 可能有多个private val sourceTables: Map[Long, String] = Map()// 字段执行过程的关系private val fieldProcess: Map[Long, mutable.Set[Long]] = Map()// 压缩后的血缘关系 只记录source表到 target表private val fieldLineage: Map[String, mutable.Set[String]] = mutable.Map();// SQL类型 考虑、insert select、create asprivate var processType: String = ""override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) {// scuess exec logic plan execlineageParser(qe)}override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) {}private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = {trybodycatch {case NonFatal(e) =>val ctx = qe.sparkSession.sparkContextlogError(s"Unexpected error occurred during lineage processing for application: ${ctx.appName} #${ctx.applicationId}", e)}}def lineageParser(qe: QueryExecution): Unit = {logInfo("----------- field lineage parse start --------")// 针对做解析且将 source表及结果表记录val analyzedLogicPlan = qe.analyzedresolveLogicV2(analyzedLogicPlan)// 关系连接connectSourceFieldAndTargetField()println(fieldLineage)}/**** @param plan*/def resolveLogicV2(plan: LogicalPlan): Unit = {// 获取原始表从 LogicalRelation 或 HiveTableRelation 目标表从 InsertIntoHiveTable 和 CreateHiveTableAsSelectCommand// 获取转换过程从Aggregate 和 Projectplan.collect {case plan: LogicalRelation => {val calalogTable = plan.catalogTable.getval tableName = calalogTable.database + "." + calalogTable.identifier.tableplan.output.foreach(columnAttribute => {val columnFullName = tableName + "." + columnAttribute.namesourceTables += (columnAttribute.exprId.id -> columnFullName)})}case plan: HiveTableRelation => {val tableName = plan.tableMeta.database + "." + plan.tableMeta.identifier.tableplan.output.foreach(columnAttribute => {val columnFullName = tableName + "." + columnAttribute.namesourceTables += (columnAttribute.exprId.id -> columnFullName)})}case plan: InsertIntoHiveTable => {val tableName = plan.table.database + "." + plan.table.identifier.tableextTargetTable(tableName, plan.query)}case plan: InsertIntoHadoopFsRelationCommand=>{val catalogTable: CatalogTable = plan.catalogTable.getval tableName=catalogTable.database+"."+catalogTable.identifier.tableextTargetTable(tableName, plan.query)}case plan: CreateHiveTableAsSelectCommand => {val tableName = plan.tableDesc.database + "." + plan.tableDesc.identifier.tableextTargetTable(tableName, plan.query)}case plan: Aggregate => {plan.aggregateExpressions.foreach(aggItem => {extFieldProcess(aggItem)})}case plan: Project => {plan.projectList.toList.foreach {pojoItem => {extFieldProcess(pojoItem)}}}//      case `plan` => logInfo("******child plan******:\n" + plan)}}def extFieldProcess(namedExpression: NamedExpression): Unit = {//alias 存在转换关系 不然就是原本的值if ("alias".equals(namedExpression.prettyName)) {val sourceFieldId = namedExpression.exprId.idval targetFieldIdSet: mutable.Set[Long] = fieldProcess.getOrElse(sourceFieldId, mutable.Set.empty)namedExpression.references.foreach(attribute => {targetFieldIdSet += attribute.exprId.id})fieldProcess += (sourceFieldId -> targetFieldIdSet)}}def extTargetTable(tableName: String, plan: LogicalPlan): Unit = {logInfo("start ext target table")plan.output.foreach(columnAttribute => {val columnFullName = tableName + "." + columnAttribute.nametargetTable += (columnAttribute.exprId.id -> columnFullName)})}/*** 从过程中提取血缘:目标表 字段循环是否存在于 source表中,不存在的话从过程中寻找直到遇见目标表*/def connectSourceFieldAndTargetField(): Unit = {val fieldIds = targetTable.keySetfieldIds.foreach(fieldId => {val resTargetFieldName = targetTable(fieldId)val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]if (sourceTables.contains(fieldId)) {val sourceFieldId = sourceTables.getOrElse(fieldId, "")resSourceFieldSet += sourceFieldId} else {val targetIdsTmp = findSourceField(fieldId)resSourceFieldSet ++= targetIdsTmp}fieldLineage += (resTargetFieldName -> resSourceFieldSet)})}def findSourceField(fieldId: Long): mutable.Set[String] = {val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]if (fieldProcess.contains(fieldId)) {val fieldIds: mutable.Set[Long] = fieldProcess.getOrElse(fieldId, mutable.Set.empty)fieldIds.foreach(fieldId => {if (sourceTables.contains(fieldId)) {resSourceFieldSet += sourceTables(fieldId)} else {val sourceFieldSet = findSourceField(fieldId)resSourceFieldSet ++= sourceFieldSet}})}resSourceFieldSet}
}

测试代码

package com.roundyuan.sparkagentimport org.apache.spark.sql.SparkSessionobject TestHive {def main(args: Array[String]): Unit = {// 连接hive数据仓库val sparkSession = SparkSession.builder().config("hive.metastore.uris", "thrift://localhost:9083").appName("HiveCaseJob").master("local[*]").enableHiveSupport().getOrCreate()val listenV3 = new FlQueryExecutionListener()sparkSession.listenerManager.register(listenV3)//sparkSession.sql("show databases").show()sparkSession.sql("insert into test.test_orc select id,count(name) as num from test.test02 group by id").show()//val user_log = sparkSession.sql("select * from dbtaobao.user_log").collect()//val test = user_log.map(row => "user_id"+row(0))//test.map(row => println(row))}
}

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roundyuan</groupId><artifactId>roundyuan-spark-lineage</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.version>2.11.12</scala.version><scala.binary.version>2.11</scala.binary.version><spark.version>2.4.7</spark.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.codehaus.janino</groupId><artifactId>commons-compiler</artifactId><version>3.0.16</version>
<!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_${scala.binary.version}</artifactId><version>${spark.version}</version>
<!--            <scope>provided</scope>--></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>
</project>

二、把项目打成jar包,并和spark集成

1.先打包 mvn clean install
2.将jar包放到 spark的jar下
3.在spark-default.conf 配置 spark.sql.execution.arrow.enabled true spark.sql.queryExecutionListeners com.roundyuan.sparkagent.FlQueryExecutionListener
4.执行sql会打印出 字段的依赖关系

三、本人对该项目的改造

1.项目结构


<1>、resources中主要包含五个文件(core-site.xml、druid.properties、hdfs-site.xml、hive-site.xml、yarn-site.xml);
<2>、entity是我的实体类

package com.roundyuan.sparkagent.entity;import lombok.Data;
import org.joda.time.DateTime;@Data
public class TableLineageInfo{private String SourceTableNme;private String SourceDatabaseNme;private String SourceColNme;private String TargetTableNme;private String TargetDatabaseNme;private String TargetColNme;private DateTime updateDate;
}

<3>、utils工具类包
代码如下:

package com.roundyuan.sparkagent.utils;import java.sql.*;public class DruidInsert {public static void insert (String SourceTableNme,String SourceDatabaseNme,String SourceColNme,String TargetTableNme,String TargetDatabaseNme,String TargetColNme){Connection conn =null;PreparedStatement pst = null;if(DruidSelect.select(TargetTableNme,TargetColNme)==false) {try {//给数据库day1 中的user表 添加一组数据// conn = JdbcUtil.getConnection();String url = "隐私,需要保密";
//换成自己PostgreSQL数据库实例所在的ip地址,并设置自己的端口 SString user = "postgres";String password = "123456";
//在这里我的密码为空,读者可以自己选择是否设置密码try {Class.forName("org.postgresql.Driver");} catch (ClassNotFoundException e) {e.printStackTrace();}conn= DriverManager.getConnection(url, user, password);String sql = "insert into TableLineageInfo(SourceTableNme,SourceDatabaseNme,SourceColNme,TargetTableNme,TargetDatabaseNme,TargetColNme,updateDate) values(?,?,?,?,?,?,?)";//获取操作sql对象pst = conn.prepareStatement(sql);//给?赋值pst.setString(1, SourceTableNme);pst.setString(2, SourceDatabaseNme);pst.setString(3, SourceColNme);pst.setString(4, TargetTableNme);pst.setString(5, TargetDatabaseNme);pst.setString(6, TargetColNme);pst.setTimestamp(7, new Timestamp(new java.util.Date().getTime()));//执行//pst.addBatch();int i = pst.executeUpdate();// System.out.println(i);} catch (SQLException e) {// e.printStackTrace();} finally {//释放资源//JdbcUtil.close(pst, conn);try {conn.close();pst.close();} catch (SQLException e) {e.printStackTrace();}}}else{DruidUpdate.update(SourceTableNme,SourceDatabaseNme,SourceColNme,TargetTableNme,TargetDatabaseNme,TargetColNme);}}
}
package com.roundyuan.sparkagent.utils;import java.sql.*;public class DruidSelect {public static Boolean select (String TargetTableNme,String TargetColNme){Connection conn =null;PreparedStatement pst = null;try {//给数据库day1 中的user表 添加一组数据// conn = JdbcUtil.getConnection();String url = "隐私,需要保密";
//换成自己PostgreSQL数据库实例所在的ip地址,并设置自己的端口 SString user = "postgres";String password = "123456";
//在这里我的密码为空,读者可以自己选择是否设置密码try {Class.forName("org.postgresql.Driver");} catch (ClassNotFoundException e) {e.printStackTrace();}conn= DriverManager.getConnection(url, user, password);String sql ="select * from TableLineageInfo where TargetTableNme=? and TargetColNme=?";//获取操作sql对象pst = conn.prepareStatement(sql);//给?赋值pst.setString(1,TargetTableNme);pst.setString(2,TargetColNme);//执行//System.out.println(pst.execute());pst.execute();} catch (SQLException e) {e.printStackTrace();}finally {//释放资源//JdbcUtil.close(pst,conn);try {conn.close();pst.close();} catch (SQLException e) {e.printStackTrace();}}return false;}
}
package com.roundyuan.sparkagent.utils;import java.sql.*;
public class DruidUpdate {public static void update (String SourceTableNme,String SourceDatabaseNme,String SourceColNme,String TargetTableNme,String TargetDatabaseNme,String TargetColNme){Connection conn =null;PreparedStatement pst = null;try {//修改一组数据//  conn = JdbcUtil.getConnection();String url = "隐私,需要保密";
//换成自己PostgreSQL数据库实例所在的ip地址,并设置自己的端口 SString user = "postgres";String password = "123456";
//在这里我的密码为空,读者可以自己选择是否设置密码try {Class.forName("org.postgresql.Driver");} catch (ClassNotFoundException e) {e.printStackTrace();}conn= DriverManager.getConnection(url, user, password);String sql ="UPDATE TableLineageInfo SET SourceTableNme = ?,SourceDatabaseNme=?,SourceColNme=?,TargetTableNme=?,TargetDatabaseNme=?,TargetColNme=?,updateDate=? WHERE TargetTableNme=?,TargetDatabaseNme=?";//获取操作sql对象pst = conn.prepareStatement(sql);//给?赋值pst.setString(1, SourceTableNme);pst.setString(2, SourceDatabaseNme);pst.setString(3, SourceColNme);pst.setString(4, TargetTableNme);pst.setString(5, TargetDatabaseNme);pst.setString(6, TargetColNme);pst.setTimestamp(7, new Timestamp(new java.util.Date().getTime()));pst.setString(8, TargetTableNme);pst.setString(9, TargetDatabaseNme);//执行System.out.println("***************更新了*****************");pst.execute();} catch (SQLException e) {e.printStackTrace();}finally {//释放资源try {conn.close();pst.close();} catch (SQLException e) {e.printStackTrace();}}}
}

<4>、FlQueryExecutionListener是我的核心类,手动获取解析计划为 explain extended,该方法为通过解析逻辑执行计划获取字段间的血缘关系

package com.roundyuan.sparkagentimport com.roundyuan.sparkagent.utils.DruidInsert
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
import org.apache.spark.sql.util.QueryExecutionListenerimport scala.collection.mutable
import scala.collection.mutable.Map
import scala.util.control.NonFatal/*** auth:zhangmengyuan* 手动获取解析计划为 explain extended* 该方法为通过解析逻辑执行计划获取字段间的血缘关系**/class FlQueryExecutionListener extends QueryExecutionListener with Logging {private val  conn= nullprivate val pst= null// 目标表应该只有一张private val targetTable: Map[Long, String] = Map()// source表 可能有多个private val sourceTables: Map[Long, String] = Map()// 字段执行过程的关系private val fieldProcess: Map[Long, mutable.Set[Long]] = Map()// 压缩后的血缘关系 只记录source表到 target表private val fieldLineage: Map[String, mutable.Set[String]] = mutable.Map();// SQL类型 考虑、insert select、create asprivate var processType: String = ""override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) {// scuess exec logic plan execlineageParser(qe)}override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) {}private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = {trybodycatch {case NonFatal(e) =>val ctx = qe.sparkSession.sparkContextlogError(s"Unexpected error occurred during lineage processing for application: ${ctx.appName} #${ctx.applicationId}", e)}}def lineageParser(qe: QueryExecution): Unit = {logInfo("----------- field lineage parse start --------")// 针对做解析且将 source表及结果表记录val analyzedLogicPlan = qe.analyzedresolveLogicV2(analyzedLogicPlan)// 关系连接connectSourceFieldAndTargetField()
//    println(fieldLineage)for(i<- fieldLineage){if(i.toString().split(",")(1).length!=6){//println(i.toString().split(",")(0).substring(1)+"<-"+i.toString().split(",")(1).substring(4,i.toString().split(",")(1).length-2))var aa = i.toString().toLowerCase().split(",")(0).substring(1)var tdb = aa.split("\\.")(0)var ttb = aa.split("\\.")(1)var tfd = aa.split("\\.")(2)var bb = i.toString().toLowerCase().split(",")(1).substring(4,i.toString().split(",")(1).length-2)var sdb = bb.split("\\.")(0)var stb = bb.split("\\.")(1)var sfd = bb.split("\\.")(2)DruidInsert.insert(stb,sdb,sfd,ttb,tdb,tfd)}}
//    println(fieldLineage)}/**** @param plan*/def resolveLogicV2(plan: LogicalPlan): Unit = {// 获取原始表从 LogicalRelation 或 HiveTableRelation 目标表从 InsertIntoHiveTable 和 CreateHiveTableAsSelectCommand// 获取转换过程从Aggregate 和 Projectplan.collect {case plan: LogicalRelation => {val calalogTable = plan.catalogTable.getval tableName = calalogTable.database + "." + calalogTable.identifier.tableplan.output.foreach(columnAttribute => {val columnFullName = tableName + "." + columnAttribute.namesourceTables += (columnAttribute.exprId.id -> columnFullName)})}case plan: HiveTableRelation => {val tableName = plan.tableMeta.database + "." + plan.tableMeta.identifier.tableplan.output.foreach(columnAttribute => {val columnFullName = tableName + "." + columnAttribute.namesourceTables += (columnAttribute.exprId.id -> columnFullName)})}case plan: InsertIntoHiveTable => {val tableName = plan.table.database + "." + plan.table.identifier.tableextTargetTable(tableName, plan.query)}case plan: InsertIntoHadoopFsRelationCommand=>{val catalogTable: CatalogTable = plan.catalogTable.getval tableName=catalogTable.database+"."+catalogTable.identifier.tableextTargetTable(tableName, plan.query)}case plan: CreateHiveTableAsSelectCommand => {val tableName = plan.tableDesc.database + "." + plan.tableDesc.identifier.tableextTargetTable(tableName, plan.query)}case plan: Aggregate => {plan.aggregateExpressions.foreach(aggItem => {extFieldProcess(aggItem)})}case plan: Project => {plan.projectList.toList.foreach {pojoItem => {extFieldProcess(pojoItem)}}}//      case `plan` => logInfo("******child plan******:\n" + plan)}}def extFieldProcess(namedExpression: NamedExpression): Unit = {//alias 存在转换关系 不然就是原本的值if ("alias".equals(namedExpression.prettyName)) {val sourceFieldId = namedExpression.exprId.idval targetFieldIdSet: mutable.Set[Long] = fieldProcess.getOrElse(sourceFieldId, mutable.Set.empty)namedExpression.references.foreach(attribute => {targetFieldIdSet += attribute.exprId.id})fieldProcess += (sourceFieldId -> targetFieldIdSet)}}def extTargetTable(tableName: String, plan: LogicalPlan): Unit = {logInfo("start ext target table")plan.output.foreach(columnAttribute => {val columnFullName = tableName + "." + columnAttribute.nametargetTable += (columnAttribute.exprId.id -> columnFullName)})}/*** 从过程中提取血缘:目标表 字段循环是否存在于 source表中,不存在的话从过程中寻找直到遇见目标表*/def connectSourceFieldAndTargetField(): Unit = {val fieldIds = targetTable.keySetfieldIds.foreach(fieldId => {val resTargetFieldName = targetTable(fieldId)val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]if (sourceTables.contains(fieldId)) {val sourceFieldId = sourceTables.getOrElse(fieldId, "")resSourceFieldSet += sourceFieldId} else {val targetIdsTmp = findSourceField(fieldId)resSourceFieldSet ++= targetIdsTmp}fieldLineage += (resTargetFieldName -> resSourceFieldSet)})}def findSourceField(fieldId: Long): mutable.Set[String] = {val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]if (fieldProcess.contains(fieldId)) {val fieldIds: mutable.Set[Long] = fieldProcess.getOrElse(fieldId, mutable.Set.empty)fieldIds.foreach(fieldId => {if (sourceTables.contains(fieldId)) {resSourceFieldSet += sourceTables(fieldId)} else {val sourceFieldSet = findSourceField(fieldId)resSourceFieldSet ++= sourceFieldSet}})}resSourceFieldSet}
}

<5>、TestHive测试类

package com.roundyuan.sparkagentimport org.apache.spark.sql.SparkSessionobject TestHive {def main(args: Array[String]): Unit = {// 连接hive数据仓库val sparkSession = SparkSession.builder().config("hive.metastore.uris", "thrift://master1:9083,thrift://slave1:9083").config("HiveDb","ods.db,dcl.db,dw.db,mdp.db,ads.db").config("HdfsPath","hdfs://nameservice1:8020/user/hive/warehouse/").config("Hdfs.dfs.nameservices","nameservice1").config("Hdfs.dfs.ha.namenodes.nameservice1","namenode424,namenode442").config("Hdfs.dfs.namenode.rpc-address.nameservice1.namenode424","master1:8020").config("Hdfs.namenode.rpc-address.nameservice1.namenode442","master2:8020").config("spark.sql.warehouse.dir","hdfs://nameservice1/user/hive/warehouse").appName("HiveCaseJob").master("local[*]").enableHiveSupport().getOrCreate()val listenV3 = new FlQueryExecutionListener()sparkSession.listenerManager.register(listenV3)//sparkSession.sql("show databases").show()sparkSession.sql("drop table tmp.t7")sparkSession.sql("create table tmp.t8 As select * from mdp.mdp_cusdm_social limit 10").show()
//    val a_1 = sparkSession.sql("select * from tmp.t3 limit 10").show()
//    val a_2 = sparkSession.sql("select * from tmp.t4").toDF()
//    val a_3 = a_1.join(a_2,a_1("client_oneid")===a_2("client_oneid"),"left")
//    a_3.createOrReplaceTempView("tt")
//    sparkSession.sql(
//      """
//        |create table tmp.tt_1 as select * from tt
//      """.stripMargin).show()//    val user_log = sparkSession.sql("select * from mdp.mdp_cusdm_nature limit 100").collect()
//    val test = user_log.map(row => "user_id"+row(0))
//    test.map(row => println(row))}
}

<6>、pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roundyuan</groupId><artifactId>roundyuan-spark-lineage</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.version>2.11.12</scala.version><scala.binary.version>2.11</scala.binary.version><spark.version>2.4.0</spark.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version><exclusions><exclusion><artifactId>hadoop-client</artifactId><groupId>org.apache.hadoop</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version><exclusions><exclusion><artifactId>scala-reflect</artifactId><groupId>org.scala-lang</groupId></exclusion></exclusions></dependency><dependency><groupId>org.codehaus.janino</groupId><artifactId>commons-compiler</artifactId><version>3.0.16</version>
<!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_${scala.binary.version}</artifactId><version>${spark.version}</version>
<!--            <scope>provided</scope>--></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/druid --><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.2.6</version></dependency><!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.16</version></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.1.1</version></dependency><!--&lt;!&ndash;&lt;!&ndash;hadoop&ndash;&gt;&ndash;&gt;--><!--<dependency>--><!--<groupId>org.apache.hadoop</groupId>--><!--<artifactId>hadoop-common</artifactId>--><!--<version>3.0.0</version>--><!--</dependency>--><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>true</skipTests></configuration></plugin><!-- java编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- Scala打包插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><id>eclipse-add-source</id><goals><goal>add-source</goal></goals></execution><execution><id>scala-compile-first</id><goals><goal>compile</goal></goals></execution><execution><id>scala-test-compile-first</id><goals><goal>testCompile</goal></goals></execution></executions></plugin><!-- java打包插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><!--是否将没有使用的类在打包的时候直接排除掉--><minimizeJar>true</minimizeJar><shadedArtifactAttached>true</shadedArtifactAttached><!--打的包含依赖的jar包名字的后缀,默认是shaded--><shadedClassifierName>dep</shadedClassifierName><artifactSet><includes><!-- Include here the dependencies youwant to be packed in your fat jar --><include>*:*</include></includes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

四、最后的清洗结果


我的清洗结果表主要有7个字段(SourceTableNme->源表名,SourceDatabaseNme->源库名,SourceColNme->源列名,TargetTableNme->目标表名,TargetDatabaseNme->目标库名,TargetColNme->目标列名,UpdateDate->更新时间)

创作不易,如果对你有用,请为我点赞、关注,后续将会推出更加优质的内容,你的点赞和关注将是我创作最大的动力之所在!!

Spark血缘字段解析相关推荐

  1. Spark之SQL解析(源码阅读十)

    如何能更好的运用与监控sparkSQL?或许我们改更深层次的了解它深层次的原理是什么.之前总结的已经写了传统数据库与Spark的sql解析之间的差别.那么我们下来直切主题~ 如今的Spark已经支持多 ...

  2. 搜狐 Hive SQL 血缘关系解析与应用

    1. 研究背景 随着企业信息化和业务的发展,数据资产日益庞大,数据仓库构建越来越复杂,在数仓构建的过程中,常遇到数据溯源困难,数据模型修改导致业务分析困难等难题,此类问题主要是由于数据血缘分析不足造成 ...

  3. 【文末有惊喜!】Hive SQL血缘关系解析与应用

    本文字数:7860字 预计阅读时间:20分钟 + 1 研究背景 随着企业信息化和业务的发展,数据资产日益庞大,数据仓库构建越来越复杂,在数仓构建的过程中,常遇到数据溯源困难,数据模型修改导致业务分析困 ...

  4. Spark Shuffle原理解析

    Spark Shuffle原理解析 一:到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节 ...

  5. 【Linux 内核 内存管理】Linux 内核内存布局 ② ( x86_64 架构体系内存分布 | 查看 /proc/meminfo 文件 | /proc/meminfo 重要字段解析 )

    文章目录 一.查看 x86_64 架构体系内存分布 二./proc/meminfo 重要字段解析 一.查看 x86_64 架构体系内存分布 执行 cat /proc/meminfo 命令 , 可以查看 ...

  6. 【Linux 内核】进程管理 ( 进程特殊形式 | 内核线程 | 用户线程 | C 标准库与 Linux 内核中进程相关概念 | Linux 查看进程命令及输出字段解析 )

    文章目录 一.进程特殊形式 ( 内核线程 | 用户线程 ) 二.C 标准库与 Linux 内核中进程相关概念 三.Linux 查看进程命令及输出字段解析 一.进程特殊形式 ( 内核线程 | 用户线程 ...

  7. Spark HistoryServer日志解析清理异常

    Spark HistoryServer日志解析&清理异常 一.背景介绍 用户在使用 Spark 提交任务时,经常会出现任务完成后在 HistoryServer(Spark 1.6 和 Spar ...

  8. spark启动的worker节点是localhost_Spark大数据在线培训:Spark运行原理解析

    在大数据技术框架当中,Spark是继Hadoop之后的又一代表性框架,也是学习大数据当中必学的重点技术框架.在这些年的发展当中,Spark所占据的市场地位,也在不断拓展.今天的Spark大数据在线培训 ...

  9. iis日志字段解析 网站运维工具使用iis日志分析工具分析iis日志(iis日志的配置)

    网站运维工具使用iis日志分析工具分析iis日志(iis日志的配置) https://www.cnblogs.com/fuqiang88/p/5870306.html 我们只能通过各种系统日志来分析网 ...

  10. 微信支付服务商加密字段解析。

    微信支付服务商加密字段解析 $decrypted = '解析出来的内容'; $key = '商户私钥'; openssl_private_decrypt(base64_decode('加密内容'),$ ...

最新文章

  1. centos6.5官方dvd做本地yum
  2. linux POSIX 信号集,读书笔记:第10章 Posix信号量 (6)
  3. Intel Realsense 官方案例源码地址
  4. Java开发经典面试题分享,建议收藏
  5. VC2008 Windows Media Player控件的使用技巧 三
  6. 深度学习——在TensorFlow中查看和设定张量的形态
  7. Python中的数据遍历操作
  8. 多线程 wait-notify 写一段代码来解决生产者-消费者问题
  9. Python入门--获取指定目录下的所有.py文件
  10. Xcode中的iOS工程模板
  11. Unix编程之size_t、ssize_t
  12. 王晓耕老师-风险管理专家
  13. 虚拟现实竞争企业分析
  14. React脚手架创建项目
  15. Linux中cinder的作用,11-cinder块存储服务部署
  16. c语言求定积分的程序,C语言求定积分
  17. 局域网故障诊断袖珍手册
  18. Linux下的打包(tar)、压缩(gzip / bzip2)
  19. 用js给html添加样式
  20. 知识分享·NLP中一些有趣的trick

热门文章

  1. 游戏模型提取工具NinjaRipper
  2. 微型计算机中的rom跟ram关系,STC单片机内部扩展RAM的应用
  3. Linux:Linux下进程间通信方式的学习
  4. python项目(2)---xpath库的应用
  5. WEB云安全技术应用篇
  6. 软RAID管理命令mdadm详解
  7. input限制小数位数
  8. 针孔微创牙龈手术(Pinhole Gum Rejuvenation)
  9. Hadoop Web应用程序代理服务器 | Hadoop Web Application Proxy
  10. Ubuntu:火狐浏览器加速下载(Flashgot+Aria2+Uget)