使用StarRocks内置工具Routine Load同步Mysql/TiDB/PG等增量更新数据到StarRocks
什么是StarRocks?
StarRocks是新一代极速统一的olap新型mpp分析型数据库,全面向量化引擎,全新的CBO优化器,性能强悍,单表查询媲美业界最强悍的clickhouse,支持多表join,支持数据秒级更新;
且同时支持高并发,架构极简,方便运维扩展,完全国产,安全可控,在国内外各行各业已经得到了广泛使用。
StarRocks提供了丰富的数据接入方式:stream load,routine load,broker load,spark load等,对接比如本地文件,对象存储,hdfs,数据库,kafka,还可以使用flink cdc方式同步数据到starrocks,也支持开源工具比如datax,seatunnel等,也定制了flink connector source/sink 到starrocks。具体可以参考官网文档:https://docs.starrocks.com/zh-cn/main/loading/Loading_intro
本文示例如何通过Routine load工具通过kafka将TP类型的增量数据方便快捷同步到StarRocks中(除了下文使用到的方法,也可以使用flink cdc借助flink sql同步)
Routine Load原理:
导入流程如上图:
- 用户通过支持MySQL协议的客户端向 FE 提交一个Kafka导入任务。
- FE将一个导入任务拆分成若干个Task,每个Task负责导入指定的一部分数据。
- 每个Task被分配到指定的 BE 上执行。在 BE 上,一个 Task 被视为一个普通的导入任务, 通过 Stream Load 的导入机制进行导入。
- BE导入完成后,向 FE 汇报。
- FE 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。
- FE 会不断的产生新的 Task,来完成数据不间断的导入。
实验环境: Mysql +Canal + Kafka + StarRocks
测试步骤:
1.开启mysql binlog
确认mysql binlog已经开启:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=2 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
也可以在mysql中通过show variables like '%xxx%'方式确认相关配置已经开启;
2.配置好canal环境,使数据sink到kakfa
配置两个文件,conf/canal.properties, conf/exmaple/instance.properties,启动canal;
3.准备好StarRocks集群
方便测试一个节点就可以,生产环境推荐至少3台服务器以上,分布式部署,多副本,保障数据的不丢失以及服务的高可用;
4.建好kafka topic
kafka中数据格式:
{"data":[{"id":"401","k1":"st","v1":"401"}],"database":"gong","es":1648790506000,"id":19,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790506948,"type":"INSERT"}
{"data":[{"id":"401","k1":"st","v1":"401"}],"database":"gong","es":1648790577000,"id":20,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790577916,"type":"DELETE"}
{"data":[{"id":"402","k1":"st","v1":"402"}],"database":"gong","es":1648790789000,"id":21,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790797431,"type":"INSERT"}
{"data":[{"id":"402","k1":"st","v1":"402"}],"database":"gong","es":1648790832000,"id":22,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790832760,"type":"DELETE"}
{"data":[{"id":"403","k1":"st","v1":"403"}],"database":"gong","es":1648791354000,"id":23,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648791354904,"type":"INSERT"}
{"data":[{"id":"403","k1":"st","v1":"403"}],"database":"gong","es":1648791385000,"id":24,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648791395247,"type":"DELETE"}
可以看到在mysql binlog输出到kafka中的json数据,后面都会有一个type字段,类型为insert,update or delete,StarRocks正是通过去解析这个字段类型,来做后续在内部的添加,更新,删除数据。
5.建好在StarRocks中建好routine load job
create routine load gong.cdc0401 on cdc_0401 columns(id,k1,v1,temp,__op=(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END))PROPERTIES ("format"="json","jsonpaths"="[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]","desired_concurrent_number"="1", "max_error_number"="1000","max_batch_interval"="5","strict_mode" = "false")FROM KAFKA ( "kafka_broker_list"= "cs01:9092,cs02:9092,cs03:9092", "kafka_topic" = "gong_test","kafka_partitions"="0","kafka_offsets"="OFFSET_BEGINNING");
需要注意的是,columns和jsonpath部分较容易弄错,参考StarRocks 论坛文章:https://forum.starrocks.com/t/topic/851
6.验证:测试,在mysql中insert,update,delete数据,是否同步到starrocks
mysql中插入数据,再删除:
MariaDB [gong]> insert into gong_cdc values(98777777,"987",9888888);
Query OK, 1 row affected (0.00 sec)MariaDB [gong]> delete from gong_cdc where id = 98777777;
Query OK, 1 row affected (0.00 sec)mysql> select * from cdc_0401;
+----------+-----------+---------+
| id | k1 | v1 |
+----------+-----------+---------+
| 321 | 3321 | 321 |
| 444 | main | 1 |
| 666 | starrocks | 666 |
| 777 | 777 | 777 |
| 888 | 777 | 888 |
| 987 | 987 | 987 |
| 10086 | cheng | 1 |
| 11111 | sr | 1 |
| 30003 | sr | 30 |
| 88888 | 88888 | 8888 |
| 100002 | march | 1 |
| 100003 | gong | 1 |
| 200000 | cheng | 1 |
| 98777777 | 987 | 9888888 |
+----------+-----------+---------+
14 rows in set (0.01 sec)
在StarRocks sql cli端check routine load任务是否正常,以及报错等:
mysql> show routine load\G;
*************************** 1. row ***************************Id: 10252Name: cdc0401CreateTime: 2022-04-01 17:01:15PauseTime: NULLEndTime: NULLDbName: default_cluster:gongTableName: cdc_0401State: RUNNINGDataSourceType: KAFKACurrentTaskNum: 1JobProperties: {"partitions":"*","columnToColumnExpr":"id,k1,v1,temp,__op=(CASE `temp` WHEN 'DELETE' THEN 1 ELSE 0 END)","maxBatchIntervalS":"5","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","format":"json","json_root":"","strict_mode":"false","jsonpaths":"[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]","desireTaskConcurrentNum":"1","maxErrorNum":"100000","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"gong_test","currentKafkaPartitions":"0","brokerList":"cs01:9092,cs02:9092,cs03:9092"}CustomProperties: {}Statistic: {"receivedBytes":1787751,"errorRows":3001,"committedTaskNum":3,"loadedRows":41,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":3042,"unselectedRows":0,"receivedBytesRate":198000,"taskExecuteTimeMs":9028}Progress: {"0":"3041"}
ReasonOfStateChanged: ErrorLogUrls: http://172.26.194.184:29122/api/_load_error_log?file=__shard_5/error_log_insert_stmt_5752f798-7efa-47d8-b7ba-fcbc08dcfad5_5752f7987efa47d8_b7bafcbc08dcfad5OtherMsg:
1 row in set (0.00 sec)ERROR:
No query specified
check数据是否同步到StarRocks:
mysql> select * from cdc_0401;
+--------+-----------+------+
| id | k1 | v1 |
+--------+-----------+------+
| 321 | 3321 | 321 |
| 444 | main | 1 |
| 666 | starrocks | 666 |
| 777 | 777 | 777 |
| 888 | 777 | 888 |
| 987 | 987 | 987 |
| 10086 | cheng | 1 |
| 11111 | sr | 1 |
| 30003 | sr | 30 |
| 88888 | 88888 | 8888 |
| 100002 | march | 1 |
| 100003 | gong | 1 |
| 200000 | cheng | 1 |
+--------+-----------+------+
13 rows in set (0.00 sec)
查看starrocks的数据,确实先进来,后删除了,同时也可以随时查看routie load job运行状况,确保任务没有异常,这样子数据才能同步进来;
7.测试过程碰到的问题
问题一:创建routine load方式如下
create routine load gong.cdc0401 on cdc_0401
columns(id,k1,v1)
PROPERTIES ( "format"="json", "desired_concurrent_number"="1", "max_error_number"="100","max_batch_interval"="5","strict_mode" = "false"
)
FROM KAFKA ( "kafka_broker_list"= "cs01:9092,cs02:9092,cs03:9092", "kafka_topic" = "gong_test","kafka_partitions"="0","kafka_offsets"="OFFSET_BEGINNING"
);
发现不管如何测试,在mysql中的insert和update都可以同步到starrocks中,一度以为starrocks官网写得增量同步,只是同步新增或变更的数据,删除不了;
参考StarRoks论坛文档链接:https://docs.starrocks.com/zh-cn/main/loading/Json_loading
强调__op字段
问题二:__op字段配置的问题
1. columns(id,k1,v1,temp,__op=(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END))2. "jsonpaths"="[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]",
这两处配置是需要格外注意的地方,可以参考官网论坛链接:https://forum.starrocks.com/t/topic/851
字段配置错误,报错:
Reason: column count mismatch, expect=4 real=1. src line: [{"data":[{"id":"88888","k1":"88888","v1":"8888"}],"database":"gong","es":1648798201000,"id":30,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648798212928,"type":"INSERT"}];
3.由于数据质量等问题引起的null情况,需要配置参数"max_error_number"="100",可以配置为一个较大的值,否则routine load任务会paused
4.在建routine load任务时候,对应字段反引号``引起来了,会报错id取值为空,其他字段取到的值也都为空情况,定位了很久,应该是当前routine load作业的一个小bug,版本2.1.x:
需要将反引号``符号去掉:
使用StarRocks内置工具Routine Load同步Mysql/TiDB/PG等增量更新数据到StarRocks相关推荐
- 如何使用Deno的内置工具
首发于我的博客:https://blog.zhangbing.site/ Deno和Node.js之间一个令人惊讶的区别是运行时内置的工具数量.除了Read-Eval-Print Loop(REPL) ...
- Java虚拟机学习总结(3)——JDK内置工具(jps、jstack、jmap、jstat)使用详解
一.JDK内置工具 - javap 1. 介绍 java 反编译工具,主要用于根据Java字节码文件反汇编为Java源代码文件. 2.命令 javap <options> <clas ...
- 利用Windows内置工具winsat测试硬盘速度(SSD机械盘对比)
利用Windows内置工具winsat测试硬盘速度(SSD&机械盘对比) 以下是红色内容是在命令行运行: C:\Users\Administrator>winsat disk Windo ...
- [MODIS数据处理#2]常用的Arcmap内置工具(一)
上一篇文章中,我们使用MRT工具完成了对MODIS数据的预处理,并对栅格进行了预处理. 接下来的两期是对Arcmap中常用内置工具的介绍,使用的ArcGIS版本为10.2: • 投影栅格 • 裁剪 • ...
- JVM-2.jdk自带内置工具
文章目录 1. 概要 2. JPS 3. jstat---性能分析 4. jmap--堆内存分析 4.1 打印java堆详情 4.2 打印堆中对象的统计信息 4.3 dump出堆内存信息 4.4 触发 ...
- (六)Java关于内置工具类
Java 内置工具类(String.StringBuffer.LinkedList.ArrayList.HashMap等工具) Java提供了异常丰富的工具类数量巨阳之多,对于 Java学习者而言 ...
- 很好用!多款快充移动电源内置智融SW7201双向同步升降压控制器!
大家都知道PD3.0协议最大支持20V/5A 100W功率,PD3.1协议进一步提高到了48V/5A 240W,这大大拓展了PD快充的应用领域,移动电源的市场已经迎来了快充时代. 经过拆解和整理,今天 ...
- python update函数会调用哪些内置函数_Python字典的内置函数中没有 append()操作,可以用 update()来更新字典内容...
Python字典的内置函数中没有 append()操作,可以用 update()来更新字典内容 答:√ 在同一代昭穆的兄弟中,以年岁序位. 答:对 <舆服志>主要描写了唐代的流行服饰样式, ...
- python内置函数系列之set(一)(持续更新)
python内置函数系列之set(一)(持续更新) 查看python中set介绍(ctrl + 鼠标左键): 有如下介绍: """set() -> new empt ...
- 使用Keil内置工具一键为工程安装FreeRTOS
最近正在学习FreeRTOS, 觉得每次都要照着流程手动给工程安装, 很麻烦. 一天在无聊乱翻Keil时, 发现Keil内部居然自带FreeRTOS包. 废话不多说, 马上开始操作. 软硬件详情 平台 ...
最新文章
- Oracle 添加RAC数据库集群节点(一)
- 比特币和比特币现金就隐私保护展开辩论
- 面试:字符串拼接,什么时候用StringBuilder?
- C#自定义控件,在项目工具箱中加入自定义控件,调用自定义控件
- Codeforces 1182A Filling Shapes
- 【SSO-CAS】sso 之 cas 实现的几个问题
- 益智小游戏(app)
- FR跨SHEET条件汇总
- android 微信地图定位失败,微信端H5使用百度地图定位获取当前位置安卓定位不准...
- visual C#(二十五)实现UWP应用的用户界面
- 135、137、138、139和445端口
- When I’m old and dying
- Feign与Gateway
- DEVc++迷你世界0.10.0
- 从零搭建SSR+VUE框架(附源码)
- 阿里云域名证书免费认证教程
- Alpha阶段敏捷冲刺⑤
- [设计模式]设计模式SOLID原则
- 【kimol君的无聊小发明】—用python写截屏小工具
- SCI与EI检索是什么意思,包括哪些专业?