0. 引言

我们之前讲解了利用canal实现无代码入侵的同步mysql数据到elasticsearch,并且讲解了主子表数据如何同步。

通过canal1.1.5实现mysql8.0数据增量/全量同步到elasticsearch7.x
canal同步mysql到es之父子表数据同步|对象型数组同步|nested数组同步

但具体生产中,仍然有更加复杂的同步需求,之前也有几位同学咨询过我,因为canal只支持2张表的数据同步,并不支持3张表及以上的同步,当不少的业务需要3表以上的同步,这就需要我们自定义canal客户端来实现了,那么今天我们就来实操演示下自定义canal客户端,实现多表同步

1. canal简介

anal是阿里开源的数据同步工具,基于bin log可以将数据库同步到其他各类数据库中,目标数据库支持mysql,postgresql,oracle,redis,MQ,ES等

canal分成服务端deployer和客户端adapter,我们可以部署多个,同时为了方便管理还提供了一个管理端admin,同时我们还可以自定义客户端,我们讲自定义的客户端称为client

canal的数据同步流程如下图所示

2. 环境准备

2.1 安装jdk

canal是基于java环境的,因此运行前需要先安装jdk,这里我安装的是jdk11。详细步骤就不再累述了。

canal1.1.5使用jdk1.8即可,以下示例的是canal1.1.6。该版本需要使用jdk11+,否则会报错NoSuchMethodError

2.2 安装canal

1、截止本文,canal的稳定版已更新到1.1.6了, 所以本文也以这个版本为例。

这里因为我们要自定义客户端,所以只用下载服务端deployer即可

官方下载地址

当然也可以通过wget指令直接下载到服务器

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

详细的安装步骤不再累述了,还不清楚的同学可以参考上一篇文章

通过canal来实现mysql数据同步到elasticsearch

2.3 mysql配置

1、因为同步是基于binlog实现的,所以要现在mysql中开启binlog

修改mysql配置文件

vim /etc/my.cnf

修改内容

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式

2、源数据库创建一个canal账号,并且设置slave,dump权限

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3、因为mysql8.0.3后身份检验方式为caching_sha2_password,但canal使用的是mysql_native_password,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
select host,user,plugin from mysql.user ;

3. 实操

3.1 服务端deployer配置

1、查询源mysql服务器的binlog位置

# 源mysql服务器中登陆mysql执行
show binary logs;

2、进入deployer安装目录

cd deployer

3、我们新建一个实例es专门用于本次演示

cd conf
# 复制example实例配置
cp -R example es

4、修改实例es配置文件instance.properties

cd es
vim instance.properties

修改内容

# position info
# 源数据库地址及端口
canal.instance.master.address=192.168.244.17:3306
# 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准
canal.instance.master.journal.name=mysql-bin.000001
# 开始同步的binlog文件位置
canal.instance.master.position=0
# 开始同步时间点 时间戳形式
canal.instance.master.timestamp=1546272000000# 数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal# 配置不同步mysql库
canal.instance.filter.black.regex=mysql\..*

mysql数据同步起点说明:

  • canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
  • canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
  • 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)

5、启动服务端

./bin/start.sh

6、查看示例日志,无报错则说明启动成功

cat logs/es/es.log

针对服务端的详细配置项解释,可以参考官方文档:

配置项解释

3.2 自定义客户端client

1、新建一个springboot项目,我们结合之前讲解的spring-data-elasticsearch来作为es客户端,这里就不单独说明其配置了,还不知道的同学可以参考之前的文章

从零搭建springboot整合spring data elasticsearch4.2.x环境

引入依赖spring-data-elasticsearchcanal-spring-boot-startermybatis-plus

     <dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-elasticsearch</artifactId><version>4.2.10</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency>

2、修改配置文件application.yml

# 应用名称
spring:application:name: canal_client_eselasticsearch:rest:# es 地址uris: http://192.168.244.11:9200username: elasticpassword: elasticdatasource:driver-class-name: com.mysql.cj.jdbc.Drivername: defaultDataSourceurl: jdbc:mysql://192.168.244.17:3306/canal_test?useSSL=false&useUnicode=true&characterEncoding=utf-8username: rootpassword: 123456server:port: 8080# canal服务端地址
canal:server: 192.168.244.22:11111# 实例名,与deployer中配置的保持统一destination: es# 设置canal消息日志打印级别
logging:level:top.javatool.canal.client: warn

3、创建es客户端配置


/*** @author benjamin* @date 2022/10/1*/
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.example.canal_client_es")
public class ElasticRestClientConfig extends AbstractElasticsearchConfiguration {@Value("${spring.elasticsearch.rest.uris}")private String url;@Value("${spring.elasticsearch.rest.username}")private String username;@Value("${spring.elasticsearch.rest.password}")private String password;@Override@Beanpublic RestHighLevelClient elasticsearchClient() {url = url.replace("http://","");String[] urlArr = url.split(",");HttpHost[] httpPostArr = new HttpHost[urlArr.length];for (int i = 0; i < urlArr.length; i++) {HttpHost httpHost = new HttpHost(urlArr[i].split(":")[0].trim(),Integer.parseInt(urlArr[i].split(":")[1].trim()), "http");httpPostArr[i] = httpHost;}final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(username,password));RestClientBuilder builder = RestClient.builder(httpPostArr)// 异步httpclient配置.setHttpClientConfigCallback(httpClientBuilder -> {// 账号密码登录httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);// httpclient连接数配置httpClientBuilder.setMaxConnTotal(30);httpClientBuilder.setMaxConnPerRoute(10);// httpclient保活策略httpClientBuilder.setKeepAliveStrategy(((response, context) -> Duration.ofMinutes(5).toMillis()));return httpClientBuilder;});return new RestHighLevelClient(builder);}@Beanpublic ElasticsearchRestTemplate elasticsearchRestTemplate(RestHighLevelClient elasticsearchClient,ElasticsearchConverter elasticsearchConverter){return new ElasticsearchRestTemplate(elasticsearchClient,elasticsearchConverter);}}

4、实现根据实体类自动创建es索引的配置类,不需要可跳过这步

@Configuration
@Slf4j
@AllArgsConstructor
public class ElasticCreateIndexStartUp implements ApplicationListener<ContextRefreshedEvent> {private final ElasticsearchRestTemplate restTemplate;@Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent){log.info("[elastic]索引初始化...");Reflections f = new Reflections("com.example.canal_client_es.entity");Set<Class<?>> classSet = f.getTypesAnnotatedWith(Document.class);for (Class<?> clazz : classSet) {IndexOperations indexOperations = restTemplate.indexOps(clazz);if(!indexOperations.exists()){indexOperations.create();indexOperations.putMapping();log.info(String.format("[elastic]索引%s数据结构创建成功",clazz.getSimpleName()));}}log.info("[elastic]索引初始化完毕");}
}

4、创建订单、商品、收货人实体,其中一个订单下有多个商品、多个收货人,我们希望同步订单表时,将商品、收货人两张表的信息同步更新。

同时因为我们需要与数据库做映射,同时也需要与es做映射,所以需要创建面向mysql和es的实体类,当然你也可以将两种整合到一起(如下所示的商品实体、收货人实体),这里为了让大家清晰的认识,我将其分开(如下所示的订单实体)

es实体类

// 订单实体
@Data
@Document(indexName = "my_order")
@Setting(replicas = 0,shards = 1)
public class Order implements Serializable {/*** 主键*/@Idprivate Long id;/*** 订单号*/@Field(type = FieldType.Keyword, name="seqNo")private String seqNo;/*** 总价*/@Field(type = FieldType.Double, name="totalPrice")private BigDecimal totalPrice;/*** 数量*/@Field(type = FieldType.Integer, name="quantity")private Integer quantity;/*** 商品清单*/@Field(type = FieldType.Nested, name="productList")private List<Product> productList;/*** 收货人清单*/@Field(type = FieldType.Nested, name="userList")private List<User> userList;}// 商品实体
@Data
@Table(name = "product")
public class Product implements Serializable {@Field(type = FieldType.Long, name="id")private Long id;@Field(type = FieldType.Keyword, name="seqNo")@Column(name = "seq_no")private String seqNo;@Field(type = FieldType.Double, name="price")private BigDecimal price;@Field(type = FieldType.Text, name="name", analyzer = "ik_smart")private String name;}// 收货人实体
@Data
@Table(name = "user")
public class User implements Serializable {@Field(type = FieldType.Long, name="id")private Long id;@Field(type = FieldType.Keyword, name="seqNo")@Column(name = "seq_no")private String seqNo;@Field(type = FieldType.Keyword, name="name")private String name;@Field(type = FieldType.Integer, name="age")private Integer age;@Field(type = FieldType.Text, name="address", analyzer = "ik_smart")private String address;
}

数据库实体,并用jpa的注解@Column来映射字段名。商品、收货人的数据库实体则整合到es实体中了,如上

@Data
@Table(name = "my_order")
public class OrderPO implements Serializable {/*** 主键*/@Column(name = "id")private Long id;/*** 订单号*/@Column(name = "seq_no")private String seqNo;/*** 总价*/@Column(name = "total_price")private BigDecimal totalPrice;/*** 数量*/@Column(name = "quantity")private Integer quantity;}

5、我们基于mybatis-plus来操作数据库,因此需要创建实体的mapper、service。详细的代码大家按照mybatis-plus的用法创建即可,或者通过本文最后下载源码查看。这里不再累叙。

6、操作到这里,最好把你的项目启动一下,如果正常则继续往下操作,如果不正常也好提前排错,不要压到最后发现一堆错,也不知道错在哪里。

7、接下来我们基于canal-client提供的EntryHandler类来实现对于数据表的监控,从而达到数据的增删改同步

@CanalTable("my_order")
@Component
@AllArgsConstructor
@Slf4j
public class OrderHandler implements EntryHandler<OrderPO> {private final ElasticsearchRestTemplate elasticsearchRestTemplate;private final IProductService productService;private final IUserService userService;@Overridepublic void insert(OrderPO orderPO) {Order order = new Order();BeanUtils.copyProperties(orderPO,order);List<Product> productList = productService.list(Wrappers.<Product>lambdaQuery().eq(Product::getSeqNo, order.getSeqNo()));order.setProductList(productList);List<User> userList = userService.list(Wrappers.<User>lambdaQuery().eq(User::getSeqNo, order.getSeqNo()));order.setUserList(userList);elasticsearchRestTemplate.save(order);}@Overridepublic void update(OrderPO before, OrderPO after) {Order order = new Order();BeanUtils.copyProperties(after,order);List<Product> productList = productService.list(Wrappers.<Product>lambdaQuery().eq(Product::getSeqNo, order.getSeqNo()));order.setProductList(productList);List<User> userList = userService.list(Wrappers.<User>lambdaQuery().eq(User::getSeqNo, order.getSeqNo()));order.setUserList(userList);elasticsearchRestTemplate.save(order);}@Overridepublic void delete(OrderPO orderPO) {elasticsearchRestTemplate.delete(orderPO.getId().toString(),Order.class);}
}

3.3 测试

1、新增一条订单数据

2、kibana中查看索引数据

GET my_order/_search

结果显示新增的订单表同步成功,并且两张子表的数据也成功同步了。

3、再修改一下订单数据

kibana查看索引,显示同步成功

4、我们将刚刚新增的订单数据在数据库中删除

同时kibana中也删除成功,说明我们删除的同步也生效了。

3.4 子表数据修改,同步主表

上述我们演示了主表数据修改时,同步主表以及两张子表的数据;有时我们需要修改子表数据,但也需要实现数据同步。

这就需要我们实现一个子表的EntryHandler,用于监听子表的数据变化,其逻辑是子表数据更新时,查询主子表的数据,再同步更新到索引中即可。

注意要监听的是子表,每张子表一个监听器,如果需要监听两张子表,那么就需要分别创建两个监听器

@CanalTable("product")
@Component
@AllArgsConstructor
@Slf4j
public class ProductHandler implements EntryHandler<Product> {private final ElasticsearchRestTemplate elasticsearchRestTemplate;@Overridepublic void insert(Product product) {// TODO}@Overridepublic void update(Product before, Product after) {// TODO}@Overridepublic void delete(Product product) {// TODO}
}

演示源码

文中演示源码可在如下地址下载:

git源码地址

总结

自此我们的数据同步就演示完成了,如果有更加复杂的同步逻辑,也可以在代码中自定义实现,并且第三方组件canal-spring-boot-starter极大的简化了我们自定义canal客户端的难度。

不过遗憾的是canal-spring-boot-starter的作者目前已经停止了对其的维护,其最新版对应的canal实际是1.1.3版本的,不过实测还不影响我们对接canal1.1.6。如果大家对canal客户端又更高性能的需求,可以研究源码,高度二开。

后续我们将给大家讲解如何实现类canal-spring-boot-starter这样的第三方依赖组件。感兴趣的同学可以关注专栏。

Elastic实战:canal自定义客户端,实现mysql多表同步到es相关推荐

  1. canal应用二:mysql数据实时同步到redis

    前言 在项目开发中,通常使用redis作为数据的缓存,那么经常遇到一个问题,修改MySQL的数据要怎么同步到Redis呢? 方式一:在系统的保存.删除接口同时对redis进行操作,但是存在一个缺点,就 ...

  2. canal没有监听到mysql,缓存不能同步

    在商品管理页面localhost:8081修改ID为10001商品price,打开页面localhost:8081/item/10001查询,price并没有更新,打开RESP.app查看10001的 ...

  3. 【SpringBoot】65、SpringBoot整合Canal+RabbitMQ监听MySQL实现数据同步更新Redis缓存

    canal 简介 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更.从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行 ...

  4. mysql多表同步es一个索引_使用 Logstash 同步多个数据表到 Elasticsearch 同一个索引 (Index)...

    最近使用了 ELK 做搜索,出报表,由于我们的业务数据很多,汇总起来有近 200 个字段,刚开始写了一个庞大的视图通过 Logstash 同步到 Elasticsearch,但是遇到了很多问题: 1. ...

  5. 基于Kafka Debezium Confluent实现MySql实时同步到ES

    实现目标 基于MySql的Binlog实现Mysql表实时同步到ES. 实现方案          1.总体技术方案基于Kafka的Connect技术.具体技术内容,不做介绍,网上有相关文章,本文章主 ...

  6. 使用canal监听binlog将数据发送到RocketMQ同步到es

    写在前面 今天不学习,明天变垃圾.最近在学习如何使用canal监听binlog并且将数据同步到es,俗话说好记性不如烂笔头,所以写一篇文章记录一下,一是为了健忘的自己,二是为了恰好有此需求的小可爱(程 ...

  7. mysql 分词搜索_实战 | canal 实现Mysql到Elasticsearch实时增量同步

    题记 关系型数据库Mysql/Oracle增量同步Elasticsearch是持续关注的问题,也是社区.QQ群等讨论最多的问题之一. 问题包含但不限于: 1.Mysql如何同步到Elasticsear ...

  8. 增量同步_实战 | canal 实现Mysql到Elasticsearch实时增量同步

    题记 关系型数据库Mysql/Oracle增量同步Elasticsearch是持续关注的问题,也是社区.QQ群等讨论最多的问题之一. 问题包含但不限于: 1.Mysql如何同步到Elasticsear ...

  9. 【.NET Core项目实战-统一认证平台】第七章 网关篇-自定义客户端限流

    上篇文章我介绍了如何在网关上增加自定义客户端授权功能,从设计到编码实现,一步一步详细讲解,相信大家也掌握了自定义中间件的开发技巧了,本篇我们将介绍如何实现自定义客户端的限流功能,来进一步完善网关的基础 ...

最新文章

  1. Android四大基本组件介绍与生命周期
  2. Win7环境下搭建GO开发平台——SublimeText 2
  3. java 基础(匿名内部类)
  4. 重庆python培训-重庆python培训机构排.行榜
  5. sqlserver执行更新语句失败报错42S22
  6. python填充空值_python空值_python空值填充_python空值变量 - 云+社区 - 腾讯云
  7. 预约 .NET Conf: Focus on F# 活动,赢得官方周边!
  8. HEVC 编解码资源
  9. windows下mongoDB的环境配置
  10. JavaScript Module Pattern
  11. ApacheHttpServer出现启动报错:the requested operation has failed解决办法
  12. 天梯赛L2-10:排座位
  13. vector中push_back和emplace_back区别
  14. Microsoft Offfice 2010 测试版下载
  15. 运城达内java毕业生分享如何提高网站优化效率
  16. 计算机视觉论文-2021-03-03
  17. 吴裕雄--天生自然 诗经:兵车行
  18. 2012开源中国开源世界高峰论坛有感
  19. 游戏建模在国内的发展前景,3D建模行业真的很缺人吗?
  20. el-form-item 正则验证

热门文章

  1. 刘克亚励志演讲(清晨/夜晚)
  2. 2019最新Android常用开源库总结(From:知乎)
  3. 新浪微博松绑140字限制 内容和社交能否兼得?
  4. 三星手机安装linux系统下载,ubuntu手机系统安装教程【详细步骤】
  5. Android”挂逼”修炼之行—微信摇骰子和猜拳作弊器原理解析
  6. Google Earth Engine(GEE)——逐月降水数据下载和直方图表展示
  7. 农家女靠养花赚钱,年收入几十万
  8. 英语作文计算机国际会议开幕词,学术会议开幕词英文.doc
  9. 如何搭建数据指标监测体系?
  10. c语言printf输出字符表情,C语言中printf输出的奇怪错误