Doris同步多库多表
官方的东西抽象到技术层面,跟具体的业务有点脱节,我们需要下沉封装,而不是削足适履。
引言
Doris用多了,把一些坑都免疫了,遇到就知道不该跳,就像spark/flink的算子调优一样,还用后期调优吗?不应该在写的时候,就肌肉记忆的使用reduceByKey来代替groupByKey吗?
与其叫“Doris同步多库多表”不如叫“Doris同步binlog踩坑指南”,基于当前大众化的实时架构,来将业务库的数据同步到Doris,做到数据的一致性,后期也希望palo团队做一下库表的过滤。
这里我说的是业务库,业务库,业务库,一般来说业务库格式更加标准和统一,并且不存在删除操作,也不会用mysql来存太大的字段,利用这几点特性来实现数据的统一,我们目前有二十几个mysql实例,3000多张表,只需要维护二十几个flink任务即可,不懂doris原理的同学照样可以进行数据同步。
routineLoad不是个好选择
有一些文章推荐使用RoutineLoad来做业务库的数据同步,这是个坏的选择
- routineLoad依靠的kafka的topic,并且是基于表纬度的数据同步
如果你的上游用的maxwell,那么你需要读kafka,过滤库表,再写入kafka,最后用routineLoad去接,增加了一步数据etl,并且将链路又加长了,增加了业务不稳定性;
如果你用的canal来同步binlog,那么你要对每一个表创建一条链路,会导致canal压力增大,cup飙升,影响整个实时链路 - 使用复杂
编写任务时,要指明字段和过滤格式,不好同一管理,当上游表结构发生变更时,必须进行重写任务,而不是重启,有点蠢 - 管理缺陷,有多少表就有多少routineLoad任务
Doris没有提供可视化界面,几千个呢,即使有问题出现了也不知道是那个出问题,集群出问题,所有任务都要重启,重启也不能指定时间重启,出问题的时候就知道有多蠢了。 - routineLoad的本质是streamLoad
官方文档中说的很明白,所有千万不要说routineLoad是实时同步,是微批同步而已,要额外指定batch_rows和batch_interval的,与其把压力给BE,不如给flink做这个etl - routineLoad同步业务库数据时要注意的几个点
1)不要指定error_row,既然是业务库数据,就要保证百分百的正确,有容错条数是几个意思?
2)kafka上游要严格指定messageKey,建议是(库名+表名+主键),这样才能保证同一个主键下的数据永远在同一个partition中,不会出现乱序的问题。
3)下游表设计时,varchar的长度尽量大点,Doris不允许超长度,mysql却可以,如果上游表字段长度太长,下游一定报错。
4)下游表设计时,default字段为null,既然是binlog的数据,就不会有缺少字段,就不要硬给数值,设置成null的好处还有就是streamLoad同步json时,表结构发生变更不会报错
上边这几条是通用的
准备
- topic中的数据是以库为单位,或者是实例为单位
- binlog写入kafka要有严格的messageKey
- 稳定的kafka和flink集群
开发
可以提前看一下我的这篇博客Flink写入Doris的实时应用
确定我们的参数
- batch_rows和batch_interval:批次提交行数和时间间隔
- 重启方式:时间、offset,既然是业务库,不怕重复消费kafka,建议是时间,比如2h前或者2021/04/26/18(精确到小时就行)
- 指定topic:读那个topic,写入那些表可以做映射,也可以读Doris上对应的库表名
- 提交curl的线程数:既然是streamLoad,那肯定是基于表的,同时允许几个curl提交,单库不得超过100个线程
解析conf
传参也好,读xml也好,读yaml也好,根据自己的喜好来。
source
想维护offset,可以看我的文章Flink手动维护kafka的offset
关键是用时间戳来读取kafka数据
etl
传统的步骤,过滤出binlog里的(data、database、table)就够了,重新组合成一个结构,准备给sink使用
sink
切记:streamLoad 不要带 -H “max_filter_ratio:0.01”,业务库数据不允许丢数据
- 根据topic获取要写入的表,存到一个set中。比如我的topic是以库为单位的,那么doris的库和mysql中的也是对应的,用jdbc去读一下这个库里有那些表就可以了;或者是我将要同步的表维护到redis,间隔一段时间去redis里同步一下要写入的表。
- 定义一个缓存数据的list来积累数据,并开始计数和计时
- 到达batch_rows和batch_interval阈值时进行提交
- 创建一个HashMap[String, ArrayList[String]],对set和list进行过滤,即put({databses}.{tablename}, ArrayList[{binlog中的data}])
- 遍历HashMap,多线程写入doris
- 重置batch_rows、batch_interval、list、HashMap
第5步的代码(简化版),结合Flink写入Doris的实时应用去看会更清晰,是我略的部分,CurlCallableThread是个执行curl的多线程。
curlThread是我之前提到的一个参数:“提交curl的线程数”,另外对任务返回结果的一个报错。
var isStop = falsevar lastRes = ""if(insertDataMap.nonEmpty){val latch = new CountDownLatch(1)val pool = Executors.newFixedThreadPool(curlThread)val reslist = new java.util.ArrayList[(Future[String], String)]()try{for(elem <- insertDataMap){val data = elem._3.mkString("[",",\n","]")//组合成jsonArrayval database = elem._1 //得到databaseval table = elem._2 //得到tableval path = s"/tmp/flink_doris/$topic/$getCurrentThreadId/$database.$table"val c1 = new CurlCallableThread(data, path, database, table, latch)val f1 = pool.submit(c1)reslist.add((f1, table))}latch.await()for (i <- 0 until reslist.length){val res = reslist(i)._1.get().toStringif(!res.startsWith("accessed")){Logger.warn(res)lastRes = resif(res.contains("ErrorURL") || res.contains("unknown table")){//打印错误Logger.error(content)Logger.error(lastRes)//删除val table = reslist(i)._2tableErr(table)}else{isStop = true}}}}catch {case e: InterruptedException => e.printStackTrace()} finally {Logger.warn(s"${timestampToDate(System.currentTimeMillis())} upsert $topic data $listLength t")pool.shutdown()insertDataMap.clear()if(isStop){val content = s"topic $topic stop in "+ timestampToDate(System.currentTimeMillis())//打印错误Logger.error(content)Logger.error(lastRes)//停止println(1/0)}}
自己做好报错监控,邮件、钉钉、短信等等
使用
存量数据补完(后期会出一篇如何快速补存量数据的文章),topic准备好就可以开始启动了
指定batch_rows和batch_interval,topic,重启方式,线程数就可以了。
当业务表字段发生变更,提前到Doris执行alter命令就可以了,字段default设置为null就不会报错
新建表时,在Doris提前建好,重启识别一下要写入的表就行了
如果没有及时的进行变更,那也没事,把重启方式的时间往前推到变更业务表之前,重新补数据即可。
如果发现报字段长度问题,超了65533,结合业务看看是不是可以删掉这个字段,不影响数据同步。
根据业务库的重要程度来调整batch_interval,建议10s以上,准实时即可
感谢百度同学热心帮助和老师的指导,以及数仓同学的case
Doris同步多库多表相关推荐
- mysql单源多表同步单库单表_MySQL主从复制单表或者多表
MySQL数据库安装不过多的介绍了:必须保证2个数据库的版本一致. 主数据库:192.168.0.43 从数据库:192.168.0.53 修改43主数据 MySQL数据库安装不过多的介绍了:必须保证 ...
- mysql创建库和表确保utf8_mysql创建utf8数据库
CentOS6.5下通过Shell创建.备份.还原MySQL数据库 CentOS6.5下通过Shell创建.备份.还原MySQL数据库 创建数据库: mysql -uroot -p123456 -e ...
- mysql主从只同步部分库或表
同步部分数据有两个思路,1.master只发送需要的:2.slave只接收想要的. master端: binlog-do-db 二进制日志记录的数据库(多数据库用逗号,隔开) binlog- ...
- 程序员自救指南:一不小心删库删表怎么办?
作者丨林晓斌 策划丨小智 写在前面 虽然我们之前遇到的大多数的数据被删,都是运维同学或者 DBA 背锅的.但实际上,只要有数据操作权限的同学,都有可能踩到误删数据这条线. 今天我们就来聊聊误删数据前后 ...
- mysql 修改单表导入大小_MySQL更改大库大表存储引擎方案
一. 概述 检查库中myisam的表, sql如下: SELECT * FROM `tables` WHERE table_schema = 'UAR_STATISTIC' AND ENGINE = ...
- 多库多表场景下使用 Amazon EMR CDC 实时入湖最佳实践
一.前言CDC(Change Data Capture) 从广义上讲所有能够捕获变更数据的技术都可以称为 CDC,但本篇文章中对 CDC 的定义限定为以非侵入的方式实时捕获数据库的变更数据.例如:通过 ...
- 95-分库分表技术之ShardingJDBC
分库分表技术之ShardingJDBC ShardingJDBC: 回顾上一章的分库分表方式: 分库分表的目的就是将我们的单库的数据控制在合理范围内,从而提高数据库的性能 垂直拆分(按照结构分): 垂 ...
- mysql-分库分表概述
分库分表概述 互联网系统需要处理大量用户的请求.比如微信日活用户破10亿,海量的用户每天产生海量的数量:美团外卖,每天都是几千万的订单,那这些系统的用户表.订单表.交易流水表等是如何处理呢? 数据量只 ...
- 比对两个数据库之间的库、表/视图以及列的差异
本项目在我的github更新 https://github.com/nongxl/DBsDiff # DBsDiff #####比对两个数据库之间的库.表/视图以及列的差异.适用于开发库和正式库的比对 ...
最新文章
- HTC Desire 金卡制作方法
- 必看2019年学员信息系统项目管理师长篇备考经验
- 深入理解MySQL底层架构,看这一篇文章就够了!
- 二改注册登录版素材代下载搜索引擎系统源码,自带火车头采集
- ansible最大并发_通过这7种方法来最大程度地提高Ansible技能
- 这本 Python 入门畅销书《“笨办法”学 Python 3》,不仅仅是一本书!
- centos镜像 from_Docker 基于 CentOS 基础镜像开发环境的搭建与部署
- 和丰钢结构企业erp管理软件
- java导出excel设置行高列宽_Java 设置Excel自适应行高或列宽
- unix和linux发音,Linux术语发音大全
- 一个bug看一天,写代码像cxk
- 总所周知,Github是一个读小说的网站!《Re0:从零开始的异世界生活》Web版
- 西南石油大学计算机类云南省分数线,2017西南石油大学各专业分数线
- OpenJudge- 1789:算24
- IDC发布2021年中国人工智能市场10大预测
- wincc卡死、wincc运行卡在变量记录不动怎么办?WinCC在激活过程中卡住了怎么办?...
- 企立方集团:拼多多推广ROI的计算方式
- 电商数仓(dwd 层)
- 跟我一起学Adams虚拟样机:(一) 运动学仿真基操,以一个简单的曲柄摇杆机构为例
- gis差值分析_arcgis中七种插值方法的对比分析
热门文章
- 自动化处理--python读取word中表格内容
- 计算机科学个人陈述中文,个人陈述样本之计算机科学方向
- Keil运行密钥(pojie)软件运行时,exe程序无法运行/被删除/防火墙警告的解决方法
- python爬取前程无忧_【Python】爬虫框架PySpider爬取前程无忧职位
- 老外说中国姓氏^_^
- 注意,金士顿Kingston KHX-FAN内存散热器,接口是小3pin母头。
- 下午尝试了php上床文件到服务器,终于搞定了。
- 清轩云DS系统源码V2.0最新UI面板分享
- beautifulsoup解析
- Web实现:背景颜色渐变色