Doris支持spark导入设计文档

背景

Doris现在支持Broker load/routine load/stream load/mini batch load等多种导入方式。 spark load主要用于解决初次迁移,大量数据迁移doris的场景,用于提升数据导入的速度。

FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。

BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。

Tablet: 一个palo table的水平分片称为tablet。

Dpp:Data preprocessing,数据预处理模块,通过外部计算资源(Hadoop、Spark)完成对导入数据预处理,包括转化、清洗、分区、排序和聚合等。

设计

目标

Doris中现有的导入方式中,针对百G级别以上的数据的批量导入支持不是很好,功能上需要修改很多配置,而且可能无法完成导入,性能上会比较慢,并且由于没有读写分离,需要占用较多的cpu等资源。而这种大数据量导入会在用户迁移的时候遇到,所以需要实现基于spark集群的导入功能,利用spark集群的并发能力,完成导入时的ETL计算,排序、聚合等等,满足用户大数据量导入需求,降低用户导入时间和迁移成本。

在Spark导入中,需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;同时,由于用户数据格式多种多样,需要支持包括csv、parquet、orc等多种格式的数据文件。

实现方案

在将spark导入的设计实现的时候,有必要讲一下现有的导入框架。现在有的导入框架,可以参考《Doris Broker导入实现解析》。

方案1

参考现有的导入框架和原有适用于百度内部hadoop集群的hadoop导入方式的实现,为了最大程度复用现有的导入框架,降低开发的难度,整体的方案如下:

用户的导入语句经过语法和语意分析之后,生成LoadStmt,LoadStmt中增加一个isSparkLoad标识字段,如果为true,就会创建出SparkLoadJob,跟BrokerLoadJob类似,会通过状态机机制,实现Job的执行,在PENDING,会创建SparkLoadPendingTask,然后在LOADING阶段还是创建LoadLoadingTask,进行数据导入。在BE中,复用现有的计划执行框架,执行导入计划。

实现Spark导入主要需要考虑以下几点:

#语法
这块主要考虑用户习惯,导入语句格式上尽量保持跟broker导入语句相似。下面是一个方案:

LOAD LABEL example_db.label1
(
DATA INFILE(“hdfs://hdfs_host:hdfs_port/user/palo/data/input/file”)
NEGATIVE
INTO TABLE my_table
PARTITION (p1, p2)
COLUMNS TERMINATED BY “,”
columns(k1,k2,k3,v1,v2)
set (
v3 = v1 + v2,
k4 = hll_hash(k2)
)
where k1 > 20
)
with spark.cluster_name
PROPERTIES
(
“spark.master” = “yarn”,
“spark.executor.cores” = “5”,
“spark.executor.memory” = “10g”,
“yarn.resourcemanager.address” = “xxx.tc:8032”,
“max_filter_ratio” = “0.1”,
);

其中spark.cluster_name为用户导入使用的Spark集群名,可以通过SET PROPERTY来设置,可参考原来Hadoop集群的设置。 property中的Spark集群设置会覆盖spark.cluster_name中对应的内容。 各个property的含义如下:

spark.master是表示spark集群部署模式,支持包括yarn/standalone/local/k8s,预计先实现yarn的支持,并且使用yarn-cluster模式(yarn-client模式一般用于交互式的场景)。
spark.executor.cores: executor的cpu个数
spark.executor.memory: executor的内存大小
yarn.resourcemanager.address:指定yarn的resourcemanager地址
max_filter_ratio:指定最大过滤比例阈值
#SparkLoadJob
用户发送spark load语句,经过parse之后,会创建SparkLoadJob,

SparkLoadJob:+-------+-------+|    PENDING    |-----------------|+-------+-------+                 || SparkLoadPendingTask    |v                         |+-------+-------+                 ||    LOADING    |-----------------|+-------+-------+                 || LoadLoadingTask         |v                         |+-------+-------+                 ||  COMMITTED    |-----------------|+-------+-------+                 ||                         |v                         v  +-------+-------+         +-------+-------+     |   FINISHED    |         |   CANCELLED   |+-------+-------+         +-------+-------+|                         Λ+-------------------------+

上图为SparkLoadJob的执行流程。

SparkLoadPendingTask

SparkLoadPendingTask主要用来提交spark etl作业到spark集群中。由于spark支持不同部署模型(localhost, standalone, yarn, k8s),所以需要抽象一个通用的接口SparkEtlJob,实现SparkEtl的功能,主要接口包括:

提交spark etl任务
取消spark etl的任务
获取spark etl任务状态的接口
大体接口如下:

class SparkEtlJob { // 提交spark etl作业 // 返回JobId String
submitJob(TBrokerScanRangeParams params); // 取消作业,用于支持用户cancel导入作业 bool cancelJob(String jobId); // 获取作业状态,用于判断是否已经完成
JobStatus getJobStatus(String jobId); private: std::list<DataDescription> data_descriptions; };

可以实现不同的子类,来实现对不同集群部署模式的支持。可以实现SparkEtlJobForYarn用于支持yarn集群的spark导入作业。具体来说上述接口中JobId就是Yarn集群的appid,如何获取appid?一个方案是通过spark-submit客户端提交spark job,然后分析标准错误中的输出,通过文本匹配获取appid。

这里需要参考hadoop dpp作业的经验,就是需要考虑任务运行可能因为数据量、集群队列等原因,会达到并发导入作业个数限制,导致后续任务提交失败,这块需要考虑一下任务堆积的问题。一个方案是可以单独设置spark load job并发数限制,并且针对每个用户提供一个并发数的限制,这样各个用户之间的作业可以不用相互干扰,提升用户体验。

spark任务执行的事情,包括以下几个关键点:

类型转化(extraction/Transformation)

将源文件字段转成具体列类型(判断字段是否合法,进行函数计算等等)

函数计算(Transformation),包括negative计算

完成用户指定的列函数的计算。函数列表:“strftime”,“time_format”,“alignment_timestamp”,“default_value”,“md5sum”,“replace_value”,“now”,“hll_hash”,“substitute”

Columns from path的提取

进行where条件的过滤

进行分区和分桶

排序和预聚合

因为在OlapTableSink过程中会进行排序和聚合,逻辑上可以不需要进行排序和聚合,但是因为排序和预聚合可以提升在BE端执行导入的效率。**如果在spark etl作业中进行排序和聚合,那么在BE执行导入的时候可以省略这个步骤。**这块可以依据后续测试的情况进行调整。目前看,可以先在etl作业中进行排序。 还有一个需要考虑的就是如何支持bitmap类型中的全局字典,string类型的bitmap列需要依赖全局字典。 为了告诉下游etl作业是否已经完成已经完成排序和聚合,可以在作业完成的时候生成一个job.json的描述文件,里面包含如下属性:

{"is_segment_file" : "false","is_sort" : "true","is_agg" : "true",
}

其中: is_sort表示是否排序 is_agg表示是否聚合 is_segment_file表示是否生成的是segment文件

现在rollup数据的计算都是基于base表,需要考虑能够根据index之间的层级关系,优化rollup数据的生成。

这里面相对比较复杂一点就是列的表达式计算的支持。

最后,spark load作业完成之后,产出的文件存储格式可以支持csv、parquet、orc,从存储效率上来说,建议默认为parquet。

LoadLoadingTask

LoadLoadingTask可以复现现在的逻辑,但是,有一个地方跟BrokerLoadJob不一样的地址就是,经过SparkEtlTask处理过后的数据文件已经完成列映射、函数计算、负导入、过滤、聚合等操作,这个时候LoadLoadingTask就不用进行这些操作,只需要进行简单的列映射和类型转化。

BE导入任务执行

这块可以完全复用现有的导入框架,应该不需要做改动。

方案2

方案1可以最大限度的复用现有的导入框架,能够快速实现支持大数据量导入的功能。但是存在以下问题,就是经过spark etl处理之后的数据其实已经按照tablet划分好了,但是现有的Broker导入框架还是会对流式读取的数据进行分区和bucket计算,然后经过序列化通过rpc发送到对应的目标BE的机器,有一次序列化和网络IO的开销。 方案2是在SparkEtlJob生成数据的时候,直接生成doris的存储格式Segment文件,然后三个副本需要通过类似clone机制的方式,通过add_rowset接口,进行文件的导入。这种方案具体不一样的地方如下:

需要在生成的文件中添加tabletid后缀
在SparkLoadPendingTask类中增加一个接口protected Map<long, Pair<String, Long>> getFilePathMap()用于返回tabletid和文件之间的映射关系,
在BE rpc服务中增加一个spark_push接口,实现拉取源端etl转化之后的文件到本地(可以通过broker读取),然后通过add_rowset接口完成数据的导入,类似克隆的逻辑
生成新的导入任务SparkLoadLoadingTask,该SparkLoadLoadingTask主要功能就是读取job.json文件,解析其中的属性并且,将属性作为rpc参数,调用spark_push接口,向tablet所在的后端BE发送导入请求,进行数据的导入。BE中spark_push根据is_segment_file来决定如何处理,如果为true,则直接下载segment文件,进行add rowset;如果为false,则走pusher逻辑,实现数据导入。
该方案将segment文件的生成也统一放到了spark集群中进行,能够极大的降低doris集群的负载,效率应该会比较高。但是方案2需要依赖于将底层rowset和segment v2的接口打包成独立的so文件,并且通过spark调用该接口来将数据转化成segment文件。

总结

综合以上两种方案,第一种方案的改动量比较小,但是BE做了重复的工作。第二种方案可以参考原有的Hadoop导入框架。所以,计划分两步完成spark load的工作。

第一步,按照方案2,实现通过Spark完成导入数据的分区排序聚合,生成parquet格式文件。然后走Hadoop pusher的流程由BE转化格式。

第二步,封装segment写入的库,直接生成Doris底层的格式,并且增加一个rpc接口,实现类似clone的导入逻辑。

Doris支持spark导入设计文档相关推荐

  1. 学生成绩管理系统-设计文档

    第1章 概述 1.1 课题背景 由于每年读书的人越来越多,有关学生的各种信息量也成倍增长,各类学生的统计分析工作也越来越困难,面对如此繁杂的工作,为了能够为高校学生信息管理提供一种更加高效实用的管理手 ...

  2. 自动生成数据库设计文档利器

    目录 1.前言介绍 2.功能说明 3.编码实战 1.新建项目 2.导入依赖 3.启动类 4.工具类 5.测试 1.前言介绍 今天给大家介绍一款非常还用的小工具,专门用到生成数据库设计文档的,非常简单. ...

  3. iOS端的UI设计文档

    iOS端的UI设计文档 APP和网站,风格色调始终注意保持一致(平台一致性) 在App不断更新的过程中定义设计准则.风格.规范 设计规范: 1.分类合理(为了能让用户快速查找,合理的分类必不可少) 2 ...

  4. 写好一份软件开发设计文档

    如何写好一份软件开发设计文档 转载: 设计文档 - 也被称作技术规范和实现手册,描述了你如何去解决一个问题,是确保正确完成工作最有用的工具,其目的是迫使你对设计展开缜密的思考,并收集他人的反馈,进而完 ...

  5. 行车记录仪程序源码设计文档

    以下是一个行车记录仪程序的简单设计文档示例: ## 行车记录仪程序设计文档 ### 1. 简介 行车记录仪程序是一个用于记录和管理车辆行驶数据的应用程序.它可以捕捉车辆的位置.速度.加速度等信息,并将 ...

  6. 朱晔的互联网架构实践心得S1E9:架构评审一百问和设计文档五要素

    朱晔的互联网架构实践心得S1E9:架构评审一百问和设计文档五要素 [下载文本PDF进行阅读] 本文我会来说说我认为架构评审中应该看的一些点,以及我写设计文档的一些心得.助你在架构评审中过五关斩六将,助 ...

  7. 结对编程项目作业-设计文档

    成员:常帆,朱佳明 贪吃蛇 贪吃蛇"游戏是一个经典的游戏,它操作简单.界面美观.功能较齐全的"贪吃蛇"游戏. 用的Python. 设计文档:1.导入数据库2.初始化游戏3 ...

  8. 如何写出一份优秀的软件设计文档

    作为一名软件工程师,我花了很多时间阅读和编写设计文档.在完成了数百篇这些文档之后,我亲眼目睹了优秀设计文档与项目最终成功之间的强烈关联. 本文试图描述什么使设计文档变得更好. 本文分为4个部分: · ...

  9. 如何才能写出好的软件设计文档?

    作为一名软件工程师,我花了很多时间在阅读和撰写设计文档上.在磨砺了数百篇文档之后,我发现,优秀的设计文档与项目的成功之间有着密切的联系. 这篇文章将介绍怎样才能写出一份优秀的设计文档. 为什么要写设计 ...

最新文章

  1. 用父节点表示法表示一棵树
  2. GPT-3到来,程序员会被AI取代吗?
  3. 感知机模型-原始版本【python实现】
  4. java-第九章-循环结构进阶-三个班级每班4位同学成绩大于85的算平均分.
  5. java项目(java project)如何导入jar包的解决方案列表
  6. 带出7个“师弟”,支付宝BASIC College的辅导员是个伪90后
  7. matlab GUI 设计 自学笔记
  8. edui 富文本编辑_React中使用UEditor百度富文本的方法
  9. oracle监听器启动停止,Oracle 10g 监听器启动后停止的解决办法
  10. cocos2d-x 3.0 使用Sax解析xml文档(解决中文显示问题)
  11. java学生成绩管理系统
  12. html css手册,CSS 参考手册
  13. 编码 - ASCII码表
  14. 照片太大怎么缩小kb?
  15. SyntaxError: Non-ASCII character ‘\xe7‘ in file F:/python_code/test/venv/Shan.py on line 7,
  16. php视频边下边播,PHP + NGINX 控制视频文件播放,并防止文件下载
  17. 用JavaScript移动对象
  18. 打印机服务出现问题的解决方法
  19. 清华大学java_清华大学出版社-图书详情-《Java语言程序设计(第3版)》
  20. Node之父重构的Deno终于发布了,它终究会取代Node吗?

热门文章

  1. C# TCP服务器和客户端
  2. sql中去掉字段的所有空格
  3. 2019-5-5学习心得
  4. pygame.error: font not initialized的解决及init()到底干了什么
  5. Confluence 6 Windows 中以服务方式自动重启修改运行服务的用户
  6. 2017年浙江中医药大学大学生程序设计竞赛(重现赛)D - CC的神奇背包
  7. Python中的break和continue的使用方法
  8. 火狐优化及遇到的问题
  9. 删除数据oracle,oracle删除数据
  10. 织梦php me,GitHub - czmz/DedeCMSv5: 中国专业的PHP网站内容管理系统-织梦内容管理系统...