Elastic实战:canal自定义客户端,实现mysql多表同步到es
0. 引言
我们之前讲解了利用canal实现无代码入侵的同步mysql数据到elasticsearch,并且讲解了主子表数据如何同步。
通过canal1.1.5实现mysql8.0数据增量/全量同步到elasticsearch7.x
canal同步mysql到es之父子表数据同步|对象型数组同步|nested数组同步
但具体生产中,仍然有更加复杂的同步需求,之前也有几位同学咨询过我,因为canal只支持2张表的数据同步,并不支持3张表及以上的同步,当不少的业务需要3表以上的同步,这就需要我们自定义canal客户端来实现了,那么今天我们就来实操演示下自定义canal客户端,实现多表同步
1. canal简介
anal是阿里开源的数据同步工具,基于bin log可以将数据库同步到其他各类数据库中,目标数据库支持mysql,postgresql,oracle,redis,MQ,ES等
canal分成服务端deployer和客户端adapter,我们可以部署多个,同时为了方便管理还提供了一个管理端admin,同时我们还可以自定义客户端,我们讲自定义的客户端称为client
canal的数据同步流程如下图所示
2. 环境准备
2.1 安装jdk
canal是基于java环境的,因此运行前需要先安装jdk,这里我安装的是jdk11。详细步骤就不再累述了。
canal1.1.5使用jdk1.8即可,以下示例的是canal1.1.6。该版本需要使用jdk11+,否则会报错NoSuchMethodError
2.2 安装canal
1、截止本文,canal的稳定版已更新到1.1.6了, 所以本文也以这个版本为例。
这里因为我们要自定义客户端,所以只用下载服务端deployer即可
官方下载地址
当然也可以通过wget指令直接下载到服务器
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
详细的安装步骤不再累述了,还不清楚的同学可以参考上一篇文章
通过canal来实现mysql数据同步到elasticsearch
2.3 mysql配置
1、因为同步是基于binlog实现的,所以要现在mysql中开启binlog
修改mysql配置文件
vim /etc/my.cnf
修改内容
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
2、源数据库创建一个canal账号,并且设置slave
,dump
权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3、因为mysql8.0.3后身份检验方式为caching_sha2_password
,但canal使用的是mysql_native_password
,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
select host,user,plugin from mysql.user ;
3. 实操
3.1 服务端deployer配置
1、查询源mysql服务器的binlog位置
# 源mysql服务器中登陆mysql执行
show binary logs;
2、进入deployer安装目录
cd deployer
3、我们新建一个实例es
专门用于本次演示
cd conf
# 复制example实例配置
cp -R example es
4、修改实例es配置文件instance.properties
cd es
vim instance.properties
修改内容
# position info
# 源数据库地址及端口
canal.instance.master.address=192.168.244.17:3306
# 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准
canal.instance.master.journal.name=mysql-bin.000001
# 开始同步的binlog文件位置
canal.instance.master.position=0
# 开始同步时间点 时间戳形式
canal.instance.master.timestamp=1546272000000# 数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal# 配置不同步mysql库
canal.instance.filter.black.regex=mysql\..*
mysql数据同步起点说明:
- canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
- canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
- 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)
5、启动服务端
./bin/start.sh
6、查看示例日志,无报错则说明启动成功
cat logs/es/es.log
针对服务端的详细配置项解释,可以参考官方文档:
配置项解释
3.2 自定义客户端client
1、新建一个springboot项目,我们结合之前讲解的spring-data-elasticsearch
来作为es客户端,这里就不单独说明其配置了,还不知道的同学可以参考之前的文章
从零搭建springboot整合spring data elasticsearch4.2.x环境
引入依赖spring-data-elasticsearch
、canal-spring-boot-starter
、mybatis-plus
<dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-elasticsearch</artifactId><version>4.2.10</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency>
2、修改配置文件application.yml
# 应用名称
spring:application:name: canal_client_eselasticsearch:rest:# es 地址uris: http://192.168.244.11:9200username: elasticpassword: elasticdatasource:driver-class-name: com.mysql.cj.jdbc.Drivername: defaultDataSourceurl: jdbc:mysql://192.168.244.17:3306/canal_test?useSSL=false&useUnicode=true&characterEncoding=utf-8username: rootpassword: 123456server:port: 8080# canal服务端地址
canal:server: 192.168.244.22:11111# 实例名,与deployer中配置的保持统一destination: es# 设置canal消息日志打印级别
logging:level:top.javatool.canal.client: warn
3、创建es客户端配置
/*** @author benjamin* @date 2022/10/1*/
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.example.canal_client_es")
public class ElasticRestClientConfig extends AbstractElasticsearchConfiguration {@Value("${spring.elasticsearch.rest.uris}")private String url;@Value("${spring.elasticsearch.rest.username}")private String username;@Value("${spring.elasticsearch.rest.password}")private String password;@Override@Beanpublic RestHighLevelClient elasticsearchClient() {url = url.replace("http://","");String[] urlArr = url.split(",");HttpHost[] httpPostArr = new HttpHost[urlArr.length];for (int i = 0; i < urlArr.length; i++) {HttpHost httpHost = new HttpHost(urlArr[i].split(":")[0].trim(),Integer.parseInt(urlArr[i].split(":")[1].trim()), "http");httpPostArr[i] = httpHost;}final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(username,password));RestClientBuilder builder = RestClient.builder(httpPostArr)// 异步httpclient配置.setHttpClientConfigCallback(httpClientBuilder -> {// 账号密码登录httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);// httpclient连接数配置httpClientBuilder.setMaxConnTotal(30);httpClientBuilder.setMaxConnPerRoute(10);// httpclient保活策略httpClientBuilder.setKeepAliveStrategy(((response, context) -> Duration.ofMinutes(5).toMillis()));return httpClientBuilder;});return new RestHighLevelClient(builder);}@Beanpublic ElasticsearchRestTemplate elasticsearchRestTemplate(RestHighLevelClient elasticsearchClient,ElasticsearchConverter elasticsearchConverter){return new ElasticsearchRestTemplate(elasticsearchClient,elasticsearchConverter);}}
4、实现根据实体类自动创建es索引的配置类,不需要可跳过这步
@Configuration
@Slf4j
@AllArgsConstructor
public class ElasticCreateIndexStartUp implements ApplicationListener<ContextRefreshedEvent> {private final ElasticsearchRestTemplate restTemplate;@Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent){log.info("[elastic]索引初始化...");Reflections f = new Reflections("com.example.canal_client_es.entity");Set<Class<?>> classSet = f.getTypesAnnotatedWith(Document.class);for (Class<?> clazz : classSet) {IndexOperations indexOperations = restTemplate.indexOps(clazz);if(!indexOperations.exists()){indexOperations.create();indexOperations.putMapping();log.info(String.format("[elastic]索引%s数据结构创建成功",clazz.getSimpleName()));}}log.info("[elastic]索引初始化完毕");}
}
4、创建订单、商品、收货人实体,其中一个订单下有多个商品、多个收货人,我们希望同步订单表时,将商品、收货人两张表的信息同步更新。
同时因为我们需要与数据库做映射,同时也需要与es做映射,所以需要创建面向mysql和es的实体类,当然你也可以将两种整合到一起(如下所示的商品实体、收货人实体),这里为了让大家清晰的认识,我将其分开(如下所示的订单实体)
es实体类
// 订单实体
@Data
@Document(indexName = "my_order")
@Setting(replicas = 0,shards = 1)
public class Order implements Serializable {/*** 主键*/@Idprivate Long id;/*** 订单号*/@Field(type = FieldType.Keyword, name="seqNo")private String seqNo;/*** 总价*/@Field(type = FieldType.Double, name="totalPrice")private BigDecimal totalPrice;/*** 数量*/@Field(type = FieldType.Integer, name="quantity")private Integer quantity;/*** 商品清单*/@Field(type = FieldType.Nested, name="productList")private List<Product> productList;/*** 收货人清单*/@Field(type = FieldType.Nested, name="userList")private List<User> userList;}// 商品实体
@Data
@Table(name = "product")
public class Product implements Serializable {@Field(type = FieldType.Long, name="id")private Long id;@Field(type = FieldType.Keyword, name="seqNo")@Column(name = "seq_no")private String seqNo;@Field(type = FieldType.Double, name="price")private BigDecimal price;@Field(type = FieldType.Text, name="name", analyzer = "ik_smart")private String name;}// 收货人实体
@Data
@Table(name = "user")
public class User implements Serializable {@Field(type = FieldType.Long, name="id")private Long id;@Field(type = FieldType.Keyword, name="seqNo")@Column(name = "seq_no")private String seqNo;@Field(type = FieldType.Keyword, name="name")private String name;@Field(type = FieldType.Integer, name="age")private Integer age;@Field(type = FieldType.Text, name="address", analyzer = "ik_smart")private String address;
}
数据库实体,并用jpa的注解@Column
来映射字段名。商品、收货人的数据库实体则整合到es实体中了,如上
@Data
@Table(name = "my_order")
public class OrderPO implements Serializable {/*** 主键*/@Column(name = "id")private Long id;/*** 订单号*/@Column(name = "seq_no")private String seqNo;/*** 总价*/@Column(name = "total_price")private BigDecimal totalPrice;/*** 数量*/@Column(name = "quantity")private Integer quantity;}
5、我们基于mybatis-plus来操作数据库,因此需要创建实体的mapper、service。详细的代码大家按照mybatis-plus的用法创建即可,或者通过本文最后下载源码查看。这里不再累叙。
6、操作到这里,最好把你的项目启动一下,如果正常则继续往下操作,如果不正常也好提前排错,不要压到最后发现一堆错,也不知道错在哪里。
7、接下来我们基于canal-client提供的EntryHandler
类来实现对于数据表的监控,从而达到数据的增删改同步
@CanalTable("my_order")
@Component
@AllArgsConstructor
@Slf4j
public class OrderHandler implements EntryHandler<OrderPO> {private final ElasticsearchRestTemplate elasticsearchRestTemplate;private final IProductService productService;private final IUserService userService;@Overridepublic void insert(OrderPO orderPO) {Order order = new Order();BeanUtils.copyProperties(orderPO,order);List<Product> productList = productService.list(Wrappers.<Product>lambdaQuery().eq(Product::getSeqNo, order.getSeqNo()));order.setProductList(productList);List<User> userList = userService.list(Wrappers.<User>lambdaQuery().eq(User::getSeqNo, order.getSeqNo()));order.setUserList(userList);elasticsearchRestTemplate.save(order);}@Overridepublic void update(OrderPO before, OrderPO after) {Order order = new Order();BeanUtils.copyProperties(after,order);List<Product> productList = productService.list(Wrappers.<Product>lambdaQuery().eq(Product::getSeqNo, order.getSeqNo()));order.setProductList(productList);List<User> userList = userService.list(Wrappers.<User>lambdaQuery().eq(User::getSeqNo, order.getSeqNo()));order.setUserList(userList);elasticsearchRestTemplate.save(order);}@Overridepublic void delete(OrderPO orderPO) {elasticsearchRestTemplate.delete(orderPO.getId().toString(),Order.class);}
}
3.3 测试
1、新增一条订单数据
2、kibana中查看索引数据
GET my_order/_search
结果显示新增的订单表同步成功,并且两张子表的数据也成功同步了。
3、再修改一下订单数据
kibana查看索引,显示同步成功
4、我们将刚刚新增的订单数据在数据库中删除
同时kibana中也删除成功,说明我们删除的同步也生效了。
3.4 子表数据修改,同步主表
上述我们演示了主表数据修改时,同步主表以及两张子表的数据;有时我们需要修改子表数据,但也需要实现数据同步。
这就需要我们实现一个子表的EntryHandler
,用于监听子表的数据变化,其逻辑是子表数据更新时,查询主子表的数据,再同步更新到索引中即可。
注意要监听的是子表,每张子表一个监听器,如果需要监听两张子表,那么就需要分别创建两个监听器
@CanalTable("product")
@Component
@AllArgsConstructor
@Slf4j
public class ProductHandler implements EntryHandler<Product> {private final ElasticsearchRestTemplate elasticsearchRestTemplate;@Overridepublic void insert(Product product) {// TODO}@Overridepublic void update(Product before, Product after) {// TODO}@Overridepublic void delete(Product product) {// TODO}
}
演示源码
文中演示源码可在如下地址下载:
git源码地址
总结
自此我们的数据同步就演示完成了,如果有更加复杂的同步逻辑,也可以在代码中自定义实现,并且第三方组件canal-spring-boot-starter
极大的简化了我们自定义canal客户端的难度。
不过遗憾的是canal-spring-boot-starter的作者目前已经停止了对其的维护,其最新版对应的canal实际是1.1.3
版本的,不过实测还不影响我们对接canal1.1.6。如果大家对canal客户端又更高性能的需求,可以研究源码,高度二开。
后续我们将给大家讲解如何实现类canal-spring-boot-starter
这样的第三方依赖组件。感兴趣的同学可以关注专栏。
Elastic实战:canal自定义客户端,实现mysql多表同步到es相关推荐
- canal应用二:mysql数据实时同步到redis
前言 在项目开发中,通常使用redis作为数据的缓存,那么经常遇到一个问题,修改MySQL的数据要怎么同步到Redis呢? 方式一:在系统的保存.删除接口同时对redis进行操作,但是存在一个缺点,就 ...
- canal没有监听到mysql,缓存不能同步
在商品管理页面localhost:8081修改ID为10001商品price,打开页面localhost:8081/item/10001查询,price并没有更新,打开RESP.app查看10001的 ...
- 【SpringBoot】65、SpringBoot整合Canal+RabbitMQ监听MySQL实现数据同步更新Redis缓存
canal 简介 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更.从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行 ...
- mysql多表同步es一个索引_使用 Logstash 同步多个数据表到 Elasticsearch 同一个索引 (Index)...
最近使用了 ELK 做搜索,出报表,由于我们的业务数据很多,汇总起来有近 200 个字段,刚开始写了一个庞大的视图通过 Logstash 同步到 Elasticsearch,但是遇到了很多问题: 1. ...
- 基于Kafka Debezium Confluent实现MySql实时同步到ES
实现目标 基于MySql的Binlog实现Mysql表实时同步到ES. 实现方案 1.总体技术方案基于Kafka的Connect技术.具体技术内容,不做介绍,网上有相关文章,本文章主 ...
- 使用canal监听binlog将数据发送到RocketMQ同步到es
写在前面 今天不学习,明天变垃圾.最近在学习如何使用canal监听binlog并且将数据同步到es,俗话说好记性不如烂笔头,所以写一篇文章记录一下,一是为了健忘的自己,二是为了恰好有此需求的小可爱(程 ...
- mysql 分词搜索_实战 | canal 实现Mysql到Elasticsearch实时增量同步
题记 关系型数据库Mysql/Oracle增量同步Elasticsearch是持续关注的问题,也是社区.QQ群等讨论最多的问题之一. 问题包含但不限于: 1.Mysql如何同步到Elasticsear ...
- 增量同步_实战 | canal 实现Mysql到Elasticsearch实时增量同步
题记 关系型数据库Mysql/Oracle增量同步Elasticsearch是持续关注的问题,也是社区.QQ群等讨论最多的问题之一. 问题包含但不限于: 1.Mysql如何同步到Elasticsear ...
- 【.NET Core项目实战-统一认证平台】第七章 网关篇-自定义客户端限流
上篇文章我介绍了如何在网关上增加自定义客户端授权功能,从设计到编码实现,一步一步详细讲解,相信大家也掌握了自定义中间件的开发技巧了,本篇我们将介绍如何实现自定义客户端的限流功能,来进一步完善网关的基础 ...
最新文章
- Android四大基本组件介绍与生命周期
- Win7环境下搭建GO开发平台——SublimeText 2
- java 基础(匿名内部类)
- 重庆python培训-重庆python培训机构排.行榜
- sqlserver执行更新语句失败报错42S22
- python填充空值_python空值_python空值填充_python空值变量 - 云+社区 - 腾讯云
- 预约 .NET Conf: Focus on F# 活动,赢得官方周边!
- HEVC 编解码资源
- windows下mongoDB的环境配置
- JavaScript Module Pattern
- ApacheHttpServer出现启动报错:the requested operation has failed解决办法
- 天梯赛L2-10:排座位
- vector中push_back和emplace_back区别
- Microsoft Offfice 2010 测试版下载
- 运城达内java毕业生分享如何提高网站优化效率
- 计算机视觉论文-2021-03-03
- 吴裕雄--天生自然 诗经:兵车行
- 2012开源中国开源世界高峰论坛有感
- 游戏建模在国内的发展前景,3D建模行业真的很缺人吗?
- el-form-item 正则验证
热门文章
- 刘克亚励志演讲(清晨/夜晚)
- 2019最新Android常用开源库总结(From:知乎)
- 新浪微博松绑140字限制 内容和社交能否兼得?
- 三星手机安装linux系统下载,ubuntu手机系统安装教程【详细步骤】
- Android”挂逼”修炼之行—微信摇骰子和猜拳作弊器原理解析
- Google Earth Engine(GEE)——逐月降水数据下载和直方图表展示
- 农家女靠养花赚钱,年收入几十万
- 英语作文计算机国际会议开幕词,学术会议开幕词英文.doc
- 如何搭建数据指标监测体系?
- c语言printf输出字符表情,C语言中printf输出的奇怪错误