今天,分布式计算引擎是许多分析、批处理和流应用程序的支柱。Spark提供了许多开箱即用的高级功能(pivot、分析窗口函数等)来转换数据。有时需要处理分层数据或执行分层计算。许多数据库供应商提供诸如“递归 CTE(公用表达式)”或“join” SQL 子句之类的功能来查询/转换分层数据。CTE 也称为递归查询或父子查询。在这篇文章中,我们将看看如何使用 Spark 解决这个问题。

分层数据概述 –

存在分层关系,其中一项数据是另一项的父项。分层数据可以使用图形属性对象模型表示,其中每一行都是一个顶点(节点),连接是连接顶点的边(关系),列是顶点的属性。

一些用例

  • 财务计算 - 子账户一直累积到父账户直至最高账户
  • 创建组织层次结构 - 经理与路径的员工关系
  • 使用路径生成网页之间的链接图
  • 任何类型的涉及链接数据的迭代计算

挑战

在分布式系统中查询分层数据有一些挑战

数据是连接的,但它分布在分区和节点之间。解决这个问题的实现应该针对执行迭代和根据需要移动数据(shuffle)进行优化。
图的深度会随着时间的推移而变化——解决方案应该处理不同的深度,并且不应该强制用户在处理之前定义它。

解决方案

在 spark 中实现 CTE 的方法之一是使用Graphx Pregel API。

什么是 Graphx Pregel API?

Graphx 是用于图形和图形并行计算的 Spark API。图算法本质上是迭代的,顶点的属性取决于它们直接或间接(通过其他顶点连接)连接顶点的属性。Pregel 是由 Google 和 spark graphX 开发的以顶点为中心的图处理模型,它提供了 pregel api 的优化变体。

Pregel API 如何工作?

Pregel API 处理包括执行超级步骤

步骤 0:

将初始消息传递给所有顶点
将值作为消息发送到其直接连接的顶点

步骤 1:

接收来自前面步骤的消息
改变值
将值作为消息发送到其直接连接的顶点
重复 步骤 1 直​​到有消息传递,当没有更多消息传递时停止。

用例的分层数据

下表显示了我们将用于生成自上而下的层次结构的示例员工数据。这里员工的经理由具有 emp_id 值的 mgr_id 字段表示。

添加以下列作为处理的一部分

Level (Depth) 顶点在层次结构中所处的级别
Path 层次结构中从最顶层顶点到当前顶点的路径
Root 层次结构中最顶层的顶点,当数据集中存在多个层次结构时很有用
Iscyclic 如果有坏数据,存在循环关系,然后标记它
Isleaf 如果顶点没有父节点,则标记它

代码

import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}import scala.util.hashing.MurmurHash3/*** Pregel API* @author zyh*/
object PregelTest {// The code below demonstrates use of Graphx Pregel API - Scala 2.11+// functions to build the top down hierarchy//setup & call the pregel api//设置并调用pregel apidef calcTopLevelHierarcy(vertexDF: DataFrame, edgeDF: DataFrame): RDD[(Any,(Int,Any,String,Int,Int))] = {// create the vertex RDD// primary key, root, pathval verticesRDD: RDD[(VertexId, (Any, Any, String))] = vertexDF.rdd.map{x=> (x.get(0),x.get(1) , x.get(2))}.map{ x => (MurmurHash3.stringHash(x._1.toString).toLong, ( x._1.asInstanceOf[Any], x._2.asInstanceOf[Any] , x._3.asInstanceOf[String]) ) }// create the edge RDD// top down relationshipval EdgesRDD = edgeDF.rdd.map{x=> (x.get(0),x.get(1))}.map{ x => Edge(MurmurHash3.stringHash(x._1.toString).toLong, MurmurHash3.stringHash(x._2.toString).toLong,"topdown" )}// create graphval graph = Graph(verticesRDD, EdgesRDD).cache()val pathSeperator = """/"""// 初始化消息// initialize id,level,root,path,iscyclic, isleafval initialMsg = (0L,0,0.asInstanceOf[Any], List("dummy"),0,1)// add more dummy attributes to the vertices - id, level, root, path, isCyclic, existing value of current vertex to build path, isleaf, pkval initialGraph = graph.mapVertices((id, v) => (id, 0, v._2, List(v._3), 0, v._3, 1, v._1) )val hrchyRDD = initialGraph.pregel(initialMsg,Int.MaxValue,            // 迭代次数, 设置成当前表示无限迭代下去EdgeDirection.Out)(setMsg,sendMsg,mergeMsg)// build the path from the listval hrchyOutRDD = hrchyRDD.vertices.map{case(id,v) => (v._8,(v._2,v._3,pathSeperator + v._4.reverse.mkString(pathSeperator),v._5, v._7 )) }hrchyOutRDD}//改变顶点的值def setMsg(vertexId: VertexId, value: (Long,Int,Any,List[String], Int,String,Int,Any), message: (Long,Int, Any,List[String],Int,Int)): (Long,Int, Any,List[String],Int,String,Int,Any) = {// 第一次收到的消息是初始化的消息 initialMsgprintln(s"设置值: $value  收到消息:  $message")if (message._2 < 1) { //superstep 0 - initialize(value._1,value._2+1,value._3,value._4,value._5,value._6,value._7,value._8)}else if ( message._5 == 1) { // set isCyclic (判断是不是一个环)(value._1, value._2, value._3, value._4, message._5, value._6, value._7,value._8)} else if ( message._6 == 0 ) { // set isleaf(value._1, value._2, value._3, value._4, value._5, value._6, message._6,value._8)}else { // set new values//( message._1,value._2+1, value._3, value._6 :: message._4 , value._5,value._6,value._7,value._8)( message._1,value._2+1, message._3, value._6 :: message._4 , value._5,value._6,value._7,value._8)}}// 将值发送到顶点def sendMsg(triplet: EdgeTriplet[(Long,Int,Any,List[String],Int,String,Int,Any), _]): Iterator[(VertexId, (Long,Int,Any,List[String],Int,Int))] = {val sourceVertex: (VertexId, Int, Any, List[String], Int, String, Int, Any) = triplet.srcAttrval destinationVertex: (VertexId, Int, Any, List[String], Int, String, Int, Any) = triplet.dstAttrprintln(s" 源头: $sourceVertex   目的地:   $destinationVertex")// 检查是不是一个死环, 就是 a是b的领导, b是a的领导// check for icyclicif (sourceVertex._1 == triplet.dstId || sourceVertex._1 == destinationVertex._1) {println(s"存在死环    源头: ${sourceVertex._1}        目的地:  ${triplet.dstId}")if (destinationVertex._5 == 0) { //set iscyclicIterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 1, sourceVertex._7)))} else {Iterator.empty}}else {// 判断是不是叶子节点,就是没有子节点的节点,属于叶子节点,根节点不算 ,所以样例数据中的叶子节点是 3,8,10if (sourceVertex._7==1) //is NOT leaf{Iterator((triplet.srcId, (sourceVertex._1,sourceVertex._2,sourceVertex._3, sourceVertex._4 ,0, 0 )))}else { // set new valuesIterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 0, 1)))}}}// 从所有连接的顶点接收值def mergeMsg(msg1: (Long,Int,Any,List[String],Int,Int), msg2: (Long,Int, Any,List[String],Int,Int)): (Long,Int,Any,List[String],Int,Int) = {println(s"合并值:   $msg1     $msg2")// dummy logic not applicable to the data in this usecasemsg2}// Test with some sample datadef main(args: Array[String]): Unit = {// 屏蔽日志Logger.getLogger("org.apache.spark").setLevel(Level.WARN)val spark: SparkSession = SparkSession.builder.appName(s"${this.getClass.getSimpleName}").master("local[1]").getOrCreate()val sc = spark.sparkContext// RDD 转 DF, 隐式转换import spark.implicits._val empData = Array(// 测试没有顶级的父节点,会出现空指针异常,构建图的时候,会根据边生成一个为null的顶点("EMP001", "Bob", "Baker", "CEO", null.asInstanceOf[String]), ("EMP002", "Jim", "Lake", "CIO", "EMP001"), ("EMP003", "Tim", "Gorab", "MGR", "EMP002"), ("EMP004", "Rick", "Summer", "MGR", "EMP002"), ("EMP005", "Sam", "Cap", "Lead", "EMP004"), ("EMP006", "Ron", "Hubb", "Sr.Dev", "EMP005"), ("EMP007", "Cathy", "Watson", "Dev", "EMP006"), ("EMP008", "Samantha", "Lion", "Dev", "EMP007"), ("EMP009", "Jimmy", "Copper", "Dev", "EMP007"), ("EMP010", "Shon", "Taylor", "Intern", "EMP009")// 空指针和顶点数据重复没有关系// 空指针和父节点在顶点中找不到有关系 (父顶点为null没有关系,需要父顶点能够在顶点列表中能找到), ("EMP011", "zhang", "xiaoming", "CTO", null))// create dataframe with some partitionsval empDF = sc.parallelize(empData, 3).toDF("emp_id","first_name","last_name","title","mgr_id").cache()// primary key , root, path - dataframe to graphx for verticesval empVertexDF = empDF.selectExpr("emp_id","concat(first_name,' ',last_name)","concat(last_name,' ',first_name)")// parent to child - dataframe to graphx for edgesval empEdgeDF = empDF.selectExpr("mgr_id","emp_id").filter("mgr_id is not null")// call the functionval empHirearchyExtDF: DataFrame = calcTopLevelHierarcy(empVertexDF,empEdgeDF).map{ case(pk,(level,root,path,iscyclic,isleaf)) => (pk.asInstanceOf[String],level,root.asInstanceOf[String],path,iscyclic,isleaf)}.toDF("emp_id_pk","level","root","path","iscyclic","isleaf").cache()// extend original table with new columnsval empHirearchyDF = empHirearchyExtDF.join(empDF , empDF.col("emp_id") === empHirearchyExtDF.col("emp_id_pk")).selectExpr("emp_id","first_name","last_name","title","mgr_id","level","root","path","iscyclic","isleaf")// printempHirearchyDF.show()}
}

输出

任务执行

Spark 作业分解为作业、阶段和任务。由于其迭代性质,Pregel API 在内部生成多个作业。每次将消息传递到顶点时都会生成一个作业。由于数据可能位于不同的节点上,因此每个作业可能会以多次 shuffle 结束。

需要注意的是在处理大型数据集时创建的长 RDD 谱系。

概括

Graphx Pregel API 非常强大,可用于解决迭代问题或任何图形计算。

图计算: 使用 Spark Graphx Pregel API 处理分层数据相关推荐

  1. Spark GraphX Pregel 应用

    一.Pregel介绍 Pregel是一种基于BSP模型实现的并行图处理系统. BSP(Bulk Synchronous Parallel Computing Model,块同步并行计算模型,又称&qu ...

  2. Spark Graphx Pregel(pregel参数详解,pregel调用实现过程的详细解释)

    Spark Graphx Pregel 一.Pregel概述 1.什么是pregel? 2.pregel应用场景 二.Pregel源码及参数解释 1.源码 2.参数详细解释 (1)initialMsg ...

  3. 重磅:腾讯正式开源图计算框架Plato,十亿级节点图计算进入分钟级时代

    整理 | 唐小引 来源 | CSDN(ID:CSDNnews) 腾讯开源进化 8 年,进入爆发期. 继刚刚连续开源 TubeMQ.Tencent Kona JDK.TBase.TKEStack 四款重 ...

  4. 腾讯正式开源图计算框架Plato,十亿级节点图计算进入分钟级时代

    腾讯开源再次迎来重磅项目,14日,腾讯正式宣布开源高性能图计算框架Plato,这是在短短一周之内,开源的第五个重大项目. 相对于目前全球范围内其它的图计算框架,Plato可满足十亿级节点的超大规模图计 ...

  5. 重磅!腾讯正式开源图计算框架 Plato,十亿级节点图计算进入分钟级时代

    整理 | 唐小引 封图 | 付费下载自东方 IC 出品 | CSDN(ID:CSDNnews) 腾讯开源进化 8 年,进入爆发期! 继刚刚连续开源 TubeMQ.Tencent Kona JDK.TB ...

  6. Spark GraphX图计算入门

    一.什么是图计算 图计算,可以简单理解为以图这种数据结构为基础,整合相关算法来实现对应应用的计算模型.社交网络中人与人之间的关系,如果用计算机数据结构表示,最合适的就是图了.其中图的顶点表示社交中的人 ...

  7. spark的数三角形算法_spark graphx 图计算

    写在前面 态度决定高度!让优秀成为一种习惯! 世界上没有什么事儿是加一次班解决不了的,如果有,就加两次!(- - -茂强) 什么是一个图 一个网络 Network 一个树 Tree 一个RDBMS R ...

  8. Spark GraphX图计算框架原理概述

    言之易而为之难,学习大数据之图计算,就是从"浊"中找出"静"的规律,达到"清"的境界:从"安"中找出"生&qu ...

  9. 那个分分钟处理10亿节点图计算的Plato,现在怎么样了?

    受访者 | 于东海 记者 | 夕颜 出品 | CSDN(ID:CSDNnews) 「AI 技术生态论」 人物访谈栏目是 CSDN 发起的百万人学 AI 倡议下的重要组成部分.通过对 AI 生态顶级大咖 ...

  10. 字节跳动自研万亿级图数据库 图计算实践 【太高级了,不是圈里的人,有简明见解的吗?】

    1. 图状结构数据广泛存在 字节跳动的所有产品的大部分业务数据,几乎都可以归入到以下三种: 用户信息.用户和用户的关系(关注.好友等): 内容(视频.文章.广告等): 用户和内容的联系(点赞.评论.转 ...

最新文章

  1. Lucene核心数据结构——FST存词典,跳表存倒排或者roarning bitmap 见另外一个文章...
  2. Django 的模板语法之过滤器
  3. C语言实现hash/adler32算法(附完整源码)
  4. 【springboot】SpringBootTest 测试时, application.properties 文件的查找顺序
  5. 广东省计算机大赛设计什么时候,2017年广东省大学生计算机设计大赛
  6. 滑动窗口/二分 - 尽可能使字符串相等
  7. php中rowcount 意思,PHP PDOStatement::rowCount讲解
  8. [数据结构-严蔚敏版]P61ADT Queue的表示与实现(单链队列-队列的链式存储结构)
  9. 云小课 | 需求任务还未分解,该咋整!项目管理Scrum项目工作分解的心酸谁能知?
  10. 你是你产品的头号用户
  11. NLP中的词向量总结与实战:从one-hot到bert
  12. easy~算法刷题第三天
  13. python下载论文_Python实现一个论文下载器的过程
  14. [转载] python可视化分析(matplotlib、seaborn、ggplot2)
  15. 移动硬盘计算机管理无法显示,移动硬盘不显示怎么办
  16. 用 Python 发电子邮件
  17. 大规模图算法在京东广告的实践
  18. 2021华为杯D题详细讲解:抗乳腺癌候选药物的优化建模
  19. 推荐一些在线效率工具汇总(数据分析,舆情监测、图片语义识别等)
  20. java 验证码突然不显示

热门文章

  1. 《画解数据结构》三十张彩图,画解二叉搜索树
  2. 13-zinx-Golang-MMO游戏Proto3协议
  3. element 问号提示_软件安装时显示问号的解决方法
  4. pythonfor循环100次_Python之for循环
  5. Ubuntu 20.04 server乌班图服务器部署django,uwsgi,mysql,nginx细节
  6. jQuery插件库免下载做法
  7. 直观解释Gamma分布和Poisson分布
  8. Docker使用教程超详细
  9. hdu 6357 Hills And Valleys——dp
  10. c语言1 2.5*3,若有如下变量定义并赋值:inta=1,b=2,c=3,k;float f=2.5,e;doubled=2.4,g;则下列符合C语言语法的...