什么是StarRocks?

StarRocks是新一代极速统一的olap新型mpp分析型数据库,全面向量化引擎,全新的CBO优化器,性能强悍,单表查询媲美业界最强悍的clickhouse,支持多表join,支持数据秒级更新;

且同时支持高并发,架构极简,方便运维扩展,完全国产,安全可控,在国内外各行各业已经得到了广泛使用。

StarRocks提供了丰富的数据接入方式:stream load,routine load,broker load,spark load等,对接比如本地文件,对象存储,hdfs,数据库,kafka,还可以使用flink cdc方式同步数据到starrocks,也支持开源工具比如datax,seatunnel等,也定制了flink connector source/sink 到starrocks。具体可以参考官网文档:https://docs.starrocks.com/zh-cn/main/loading/Loading_intro

本文示例如何通过Routine load工具通过kafka将TP类型的增量数据方便快捷同步到StarRocks中(除了下文使用到的方法,也可以使用flink cdc借助flink sql同步)

Routine Load原理:

导入流程如上图:

  1. 用户通过支持MySQL协议的客户端向 FE 提交一个Kafka导入任务。
  2. FE将一个导入任务拆分成若干个Task,每个Task负责导入指定的一部分数据。
  3. 每个Task被分配到指定的 BE 上执行。在 BE 上,一个 Task 被视为一个普通的导入任务, 通过 Stream Load 的导入机制进行导入。
  4. BE导入完成后,向 FE 汇报。
  5. FE 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。
  6. FE 会不断的产生新的 Task,来完成数据不间断的导入。

实验环境: Mysql +Canal + Kafka + StarRocks

测试步骤:

1.开启mysql binlog

确认mysql binlog已经开启:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=2       # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

也可以在mysql中通过show variables like '%xxx%'方式确认相关配置已经开启;

2.配置好canal环境,使数据sink到kakfa

配置两个文件,conf/canal.properties, conf/exmaple/instance.properties,启动canal;

3.准备好StarRocks集群

方便测试一个节点就可以,生产环境推荐至少3台服务器以上,分布式部署,多副本,保障数据的不丢失以及服务的高可用;

4.建好kafka topic

kafka中数据格式:

{"data":[{"id":"401","k1":"st","v1":"401"}],"database":"gong","es":1648790506000,"id":19,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790506948,"type":"INSERT"}
{"data":[{"id":"401","k1":"st","v1":"401"}],"database":"gong","es":1648790577000,"id":20,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790577916,"type":"DELETE"}
{"data":[{"id":"402","k1":"st","v1":"402"}],"database":"gong","es":1648790789000,"id":21,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790797431,"type":"INSERT"}
{"data":[{"id":"402","k1":"st","v1":"402"}],"database":"gong","es":1648790832000,"id":22,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790832760,"type":"DELETE"}
{"data":[{"id":"403","k1":"st","v1":"403"}],"database":"gong","es":1648791354000,"id":23,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648791354904,"type":"INSERT"}
{"data":[{"id":"403","k1":"st","v1":"403"}],"database":"gong","es":1648791385000,"id":24,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648791395247,"type":"DELETE"}

可以看到在mysql binlog输出到kafka中的json数据,后面都会有一个type字段,类型为insert,update or delete,StarRocks正是通过去解析这个字段类型,来做后续在内部的添加,更新,删除数据。

5.建好在StarRocks中建好routine load job

create routine load gong.cdc0401 on cdc_0401 columns(id,k1,v1,temp,__op=(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END))PROPERTIES ("format"="json","jsonpaths"="[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]","desired_concurrent_number"="1", "max_error_number"="1000","max_batch_interval"="5","strict_mode" = "false")FROM KAFKA (   "kafka_broker_list"= "cs01:9092,cs02:9092,cs03:9092",   "kafka_topic" = "gong_test","kafka_partitions"="0","kafka_offsets"="OFFSET_BEGINNING");

需要注意的是,columns和jsonpath部分较容易弄错,参考StarRocks 论坛文章:https://forum.starrocks.com/t/topic/851

6.验证:测试,在mysql中insert,update,delete数据,是否同步到starrocks

mysql中插入数据,再删除:

MariaDB [gong]> insert into gong_cdc values(98777777,"987",9888888);
Query OK, 1 row affected (0.00 sec)MariaDB [gong]> delete from gong_cdc where id = 98777777;
Query OK, 1 row affected (0.00 sec)mysql> select * from cdc_0401;
+----------+-----------+---------+
| id       | k1        | v1      |
+----------+-----------+---------+
|      321 | 3321      |     321 |
|      444 | main      |       1 |
|      666 | starrocks |     666 |
|      777 | 777       |     777 |
|      888 | 777       |     888 |
|      987 | 987       |     987 |
|    10086 | cheng     |       1 |
|    11111 | sr        |       1 |
|    30003 | sr        |      30 |
|    88888 | 88888     |    8888 |
|   100002 | march     |       1 |
|   100003 | gong      |       1 |
|   200000 | cheng     |       1 |
| 98777777 | 987       | 9888888 |
+----------+-----------+---------+
14 rows in set (0.01 sec)

在StarRocks sql cli端check routine load任务是否正常,以及报错等:

mysql> show routine load\G;
*************************** 1. row ***************************Id: 10252Name: cdc0401CreateTime: 2022-04-01 17:01:15PauseTime: NULLEndTime: NULLDbName: default_cluster:gongTableName: cdc_0401State: RUNNINGDataSourceType: KAFKACurrentTaskNum: 1JobProperties: {"partitions":"*","columnToColumnExpr":"id,k1,v1,temp,__op=(CASE `temp` WHEN 'DELETE' THEN 1 ELSE 0 END)","maxBatchIntervalS":"5","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","format":"json","json_root":"","strict_mode":"false","jsonpaths":"[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]","desireTaskConcurrentNum":"1","maxErrorNum":"100000","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"gong_test","currentKafkaPartitions":"0","brokerList":"cs01:9092,cs02:9092,cs03:9092"}CustomProperties: {}Statistic: {"receivedBytes":1787751,"errorRows":3001,"committedTaskNum":3,"loadedRows":41,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":3042,"unselectedRows":0,"receivedBytesRate":198000,"taskExecuteTimeMs":9028}Progress: {"0":"3041"}
ReasonOfStateChanged: ErrorLogUrls: http://172.26.194.184:29122/api/_load_error_log?file=__shard_5/error_log_insert_stmt_5752f798-7efa-47d8-b7ba-fcbc08dcfad5_5752f7987efa47d8_b7bafcbc08dcfad5OtherMsg:
1 row in set (0.00 sec)ERROR:
No query specified

check数据是否同步到StarRocks:

mysql> select * from cdc_0401;
+--------+-----------+------+
| id     | k1        | v1   |
+--------+-----------+------+
|    321 | 3321      |  321 |
|    444 | main      |    1 |
|    666 | starrocks |  666 |
|    777 | 777       |  777 |
|    888 | 777       |  888 |
|    987 | 987       |  987 |
|  10086 | cheng     |    1 |
|  11111 | sr        |    1 |
|  30003 | sr        |   30 |
|  88888 | 88888     | 8888 |
| 100002 | march     |    1 |
| 100003 | gong      |    1 |
| 200000 | cheng     |    1 |
+--------+-----------+------+
13 rows in set (0.00 sec)

查看starrocks的数据,确实先进来,后删除了,同时也可以随时查看routie load job运行状况,确保任务没有异常,这样子数据才能同步进来;

7.测试过程碰到的问题

问题一:创建routine load方式如下

create routine load gong.cdc0401 on cdc_0401
columns(id,k1,v1)
PROPERTIES ( "format"="json", "desired_concurrent_number"="1",  "max_error_number"="100","max_batch_interval"="5","strict_mode" = "false"
)
FROM KAFKA (    "kafka_broker_list"= "cs01:9092,cs02:9092,cs03:9092",    "kafka_topic" = "gong_test","kafka_partitions"="0","kafka_offsets"="OFFSET_BEGINNING"
);

发现不管如何测试,在mysql中的insert和update都可以同步到starrocks中,一度以为starrocks官网写得增量同步,只是同步新增或变更的数据,删除不了;

参考StarRoks论坛文档链接:https://docs.starrocks.com/zh-cn/main/loading/Json_loading

强调__op字段

问题二:__op字段配置的问题

1. columns(id,k1,v1,temp,__op=(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END))2. "jsonpaths"="[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]",

这两处配置是需要格外注意的地方,可以参考官网论坛链接:https://forum.starrocks.com/t/topic/851

字段配置错误,报错:

Reason: column count mismatch, expect=4 real=1. src line: [{"data":[{"id":"88888","k1":"88888","v1":"8888"}],"database":"gong","es":1648798201000,"id":30,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648798212928,"type":"INSERT"}];

3.由于数据质量等问题引起的null情况,需要配置参数"max_error_number"="100",可以配置为一个较大的值,否则routine load任务会paused

4.在建routine load任务时候,对应字段反引号``引起来了,会报错id取值为空,其他字段取到的值也都为空情况,定位了很久,应该是当前routine load作业的一个小bug,版本2.1.x:

需要将反引号``符号去掉:

使用StarRocks内置工具Routine Load同步Mysql/TiDB/PG等增量更新数据到StarRocks相关推荐

  1. 如何使用Deno的内置工具

    首发于我的博客:https://blog.zhangbing.site/ Deno和Node.js之间一个令人惊讶的区别是运行时内置的工具数量.除了Read-Eval-Print Loop(REPL) ...

  2. Java虚拟机学习总结(3)——JDK内置工具(jps、jstack、jmap、jstat)使用详解

    一.JDK内置工具 - javap 1. 介绍 java 反编译工具,主要用于根据Java字节码文件反汇编为Java源代码文件. 2.命令 javap <options> <clas ...

  3. 利用Windows内置工具winsat测试硬盘速度(SSD机械盘对比)

    利用Windows内置工具winsat测试硬盘速度(SSD&机械盘对比) 以下是红色内容是在命令行运行: C:\Users\Administrator>winsat disk Windo ...

  4. [MODIS数据处理#2]常用的Arcmap内置工具(一)

    上一篇文章中,我们使用MRT工具完成了对MODIS数据的预处理,并对栅格进行了预处理. 接下来的两期是对Arcmap中常用内置工具的介绍,使用的ArcGIS版本为10.2: • 投影栅格 • 裁剪 • ...

  5. JVM-2.jdk自带内置工具

    文章目录 1. 概要 2. JPS 3. jstat---性能分析 4. jmap--堆内存分析 4.1 打印java堆详情 4.2 打印堆中对象的统计信息 4.3 dump出堆内存信息 4.4 触发 ...

  6. (六)Java关于内置工具类

    Java 内置工具类(String.StringBuffer.LinkedList.ArrayList.HashMap等工具) ​ Java提供了异常丰富的工具类数量巨阳之多,对于 Java学习者而言 ...

  7. 很好用!多款快充移动电源内置智融SW7201双向同步升降压控制器!

    大家都知道PD3.0协议最大支持20V/5A 100W功率,PD3.1协议进一步提高到了48V/5A 240W,这大大拓展了PD快充的应用领域,移动电源的市场已经迎来了快充时代. 经过拆解和整理,今天 ...

  8. python update函数会调用哪些内置函数_Python字典的内置函数中没有 append()操作,可以用 update()来更新字典内容...

    Python字典的内置函数中没有 append()操作,可以用 update()来更新字典内容 答:√ 在同一代昭穆的兄弟中,以年岁序位. 答:对 <舆服志>主要描写了唐代的流行服饰样式, ...

  9. python内置函数系列之set(一)(持续更新)

    python内置函数系列之set(一)(持续更新) 查看python中set介绍(ctrl + 鼠标左键): 有如下介绍: """set() -> new empt ...

  10. 使用Keil内置工具一键为工程安装FreeRTOS

    最近正在学习FreeRTOS, 觉得每次都要照着流程手动给工程安装, 很麻烦. 一天在无聊乱翻Keil时, 发现Keil内部居然自带FreeRTOS包. 废话不多说, 马上开始操作. 软硬件详情 平台 ...

最新文章

  1. Oracle 添加RAC数据库集群节点(一)
  2. 比特币和比特币现金就隐私保护展开辩论
  3. 面试:字符串拼接,什么时候用StringBuilder?
  4. C#自定义控件,在项目工具箱中加入自定义控件,调用自定义控件
  5. Codeforces 1182A Filling Shapes
  6. 【SSO-CAS】sso 之 cas 实现的几个问题
  7. 益智小游戏(app)
  8. FR跨SHEET条件汇总
  9. android 微信地图定位失败,微信端H5使用百度地图定位获取当前位置安卓定位不准...
  10. visual C#(二十五)实现UWP应用的用户界面
  11. 135、137、138、139和445端口
  12. When I’m old and dying
  13. Feign与Gateway
  14. DEVc++迷你世界0.10.0
  15. 从零搭建SSR+VUE框架(附源码)
  16. 阿里云域名证书免费认证教程
  17. Alpha阶段敏捷冲刺⑤
  18. [设计模式]设计模式SOLID原则
  19. 【kimol君的无聊小发明】—用python写截屏小工具
  20. SCI与EI检索是什么意思,包括哪些专业?

热门文章

  1. 秃头不用怕!程序员脱发自救指南来了
  2. jQuery + JavaScript 实现的动态添加文本框功能 和 动态删除文本框功能(二)
  3. Android网络编程(一次网络请求)
  4. CentOS7清理磁盘空间
  5. phpstudy使用教程(一)
  6. Foundation框架之字符串和日期
  7. java重载和重写的区别_Java重载和重写的区别
  8. json的格式是什么?
  9. 分享十点C语言入门知识
  10. c语言汉字属于什么类型_【C语言】必学知识点 - 基本数据类型!你学会了吗?...