文章目录

  • 一、软件安装
    • 1. jdk 安装
    • 2. ES7.15.2 安装
    • 3. Mysql 8.0 安装
    • 4. canal下载
  • 二、Mysql 配置
    • 2.1. 开启binlog
    • 2.2. 验证binlog状态
    • 2.3. 创建账号
    • 2.4. 权限赋予
    • 2.5. 刷新权限
    • 2.6. 创建数据库
    • 2.7. 初始化表结构
    • 2.8. 初始化数据
  • 三、canal-deployer的配置与使用
    • 3.1. 解压deployer
    • 3.2. 解压后目录结构
    • 3.3. 修改配置
    • 3.4. 启动deployer
  • 四、canal-adaptor的使用和配置
    • 4.1. 解压adaptor
    • 4.2. 解压后目录结构
    • 4.3. 修改配置
    • 4.3. 新建配置
    • 4.4. 启动adapter
    • 4.5. 查看adapter日志
    • 4.6. 异常方案
  • 五、中间件启动
    • 5.1. 启动es
    • 5.2. 启动kibana
    • 5.3. 启动canal.adapter
  • 六、全量同步(demo)
    • 6.1. 创建索引
    • 6.2. 执行全量同步
  • 七、全量同步(企业)
    • 7.1. curl命令创建索引
    • 7.2. kibana创建索引
    • 7.3. 执行全量同步url
    • 7.4. kibana查询数据
    • 7.5. canal 增量同步
一、软件安装
软件 版本
MySQL 8.0.26
ElasticSearch 7.15.2
jdk 1.8.0_202
1. jdk 安装

jdk 安装 linux环境

2. ES7.15.2 安装

Elasticsearch7.15.2 安装

3. Mysql 8.0 安装

Mysql 8.0 安装教程 Linux Centos7

4. canal下载

使用canal前请先确保安装了mysql、elasticsearch、jdk

  • canal简介
    canal是阿里巴巴开源的MySQL binlog 增量订阅&消费组件。
    canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
    canal的GitHub地址: GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

  • canal 下载
    Releases · alibaba/canal · GitHub
    建议下载以下四个。其中下载源码是为了解决canal.adapter中的一个依赖冲突,要保存在自己开发用的电脑里,后面会讲到它。

  • canal 组件用用简述:
    canal的各个组件的用途各不相同,下面分别介绍下:
    canal-deploy:用于监听MySQL的binlog,是一个伪装的MySQL从库,只负责从MySQL主库接收数据,不做处理。
    canal-adapter:canal的客户端,从canal-deploy中获取数据,然后同步数据到目标数据源,我们用它将数据存储到ElasticSearch中。
    canal-admin:为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

二、Mysql 配置
2.1. 开启binlog

先配置mysql数据库,打开binlog写入功能,设置binlog-format为ROW。
配置文件路径为 /etc/my.cnf

vi /etc/my.cnf

直接覆盖即可

[mysqld]
basedir=/usr/local/mysql-8.0/
datadir=/usr/local/mysql-8.0/data/
socket=/tmp/mysql.sock
character-set-server=UTF8MB4
symbolic-links=0
## 设置server_id,同一局域网中需要唯一
server-id=1
## 设置使用的二进制日志格式(mixed,statement,row)
binlog_format="ROW"
## 开启二进制日志功能
log_bin=/usr/local/mysql-8.0/mysql_bin
2.2. 验证binlog状态

配置完成后重新启动MySQL,重启后执行下面的SQL语句查看binlog是否启用:

systemctl restart mysql
mysql -uroot -p123456
show variables like '%log_bin%';


详情参考:MySQL8.0.26 开启bin_log日志 linux

2.3. 创建账号

接下来需要创建一个拥有从库权限的账号,用于订阅binlog,这里创建的账号为canal:canal;

CREATE USER canal IDENTIFIED BY 'canal';
2.4. 权限赋予
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
2.5. 刷新权限
FLUSH PRIVILEGES;
2.6. 创建数据库
create database dianpingdb;
2.7. 初始化表结构

任选一种,或者都初始化

  • demo 案例:
-- ----------------------------
-- Table structure for student
-- ----------------------------
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student`  (`id` varchar(32) CHARACTER SET utf8 COLLATE utf8_unicode_ci NOT NULL,`name` varchar(100) CHARACTER SET utf8 COLLATE utf8_unicode_ci NULL DEFAULT NULL,`grade` tinyint(1) NULL DEFAULT 1,`class` tinyint(1) NULL DEFAULT 0,`birthday` date NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_unicode_ci ROW_FORMAT = Dynamic;
  • 真实企业案例:
    该表结构取自真实开源大众点评项目,直接下载初始化即可
    https://gitee.com/gb_90/dianping/tree/master/doc/dianping.sql
2.8. 初始化数据

任选一种,或者都初始化

  • demo 案例数据:
INSERT INTO `student` VALUES ('1', '大家好,我是gblfy', 1, 0, '2021-11-23');
INSERT INTO `student` VALUES ('2', '大家好,我是gblfy2', 1, 0, '2021-11-23');
INSERT INTO `student` VALUES ('3', '大家好,我是gblfy3', 1, 0, '2021-11-23');
INSERT INTO `student` VALUES ('4', '大家好,我是gblfy4', 1, 0, '2021-11-23');
INSERT INTO `student` VALUES ('5', '大家好,我是gblfy5', 1, 0, '2021-11-23');
  • 真实企业案例数据:
    该表结构取自真实开源大众点评项目,直接下载初始化即可
    https://gitee.com/gb_90/dianping/tree/master/doc/dml.sql
三、canal-deployer的配置与使用

3.1. 解压deployer
mkdir /app/canal/canal.deployer -p
cd /app/canal/
tar -zxvf canal.deployer-1.1.5.tar.gz -C /app/canal/canal.deployer
3.2. 解压后目录结构
.
├── bin
│   ├── restart.sh
│   ├── startup.bat
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── canal_local.properties
│   ├── canal.properties
│   ├── example
│   │   └── instance.properties
│   ├── logback.xml
│   ├── metrics
│   │   └── Canal_instances_tmpl.json
│   └── spring
├── lib
├── logs
└── plugin
3.3. 修改配置

修改配置文件conf/example/instance.properties

cd /app/canal/canal.deployer/conf/example
vim instance.properties

修改下面几项配置:我这里只修改了slaveId其他的都可以默认

关键信息:

# 需要同步数据的MySQL地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库密码
canal.instance.dbPassword=canal
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*
3.4. 启动deployer
cd /app/canal/canal.deployer
bin/startup.sh && tail -f logs/canal/canal.log

四、canal-adaptor的使用和配置

4.1. 解压adaptor
mkdir  /app/canal/canal.adapter -p
cd /app/canal/
tar -zxvf canal.adapter-1.1.5.tar.gz -C /app/canal/canal.adapter
4.2. 解压后目录结构
.
├── bin
│   ├── restart.sh
│   ├── startup.bat
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── application.yml
│   ├── bootstrap.yml
│   ├── es6
│   │   ├── biz_order.yml
│   │   ├── customer.yml
│   │   └── mytest_user.yml
│   ├── es7
│   │   ├── biz_order.yml
│   │   ├── customer.yml
│   │   └── mytest_user.yml
│   ├── hbase
│   │   └── mytest_person2.yml
│   ├── kudu
│   │   └── kudutest_user.yml
│   ├── logback.xml
│   ├── META-INF
│   │   └── spring.factories
│   └── rdb
│       └── mytest_user.yml
├── lib
├── logs
└── plugin├── client-adapter.es6x-1.1.5-jar-with-dependencies.jar├── client-adapter.es7x-1.1.5-jar-with-dependencies.jar├── client-adapter.hbase-1.1.5-jar-with-dependencies.jar├── client-adapter.logger-1.1.5-jar-with-dependencies.jar├── client-adapter.rdb-1.1.5-jar-with-dependencies.jar├── connector.kafka-1.1.5-jar-with-dependencies.jar├── connector.rabbitmq-1.1.5-jar-with-dependencies.jar├── connector.rocketmq-1.1.5-jar-with-dependencies.jar└── connector.tcp-1.1.5-jar-with-dependencies.jar
4.3. 修改配置

修改配置文件conf/application.yml

cd /app/canal/canal.adapter
vim conf/application.yml


4.3. 新建配置
cd /app/canal/canal.adapter/conf/es7/
  • demo案例:
vim student.yml

写入以下内容

dataSourceKey: defaultDS
destination: student
groupId: g1
esMapping:_index: student_type: _doc_id: _idsql: "SELECT id as _id,name,grade,class,birthdayfrom student"etlCondition: "where birthday>={}"commitBatch: 3000

  • 企业案例:
vim shop.yml

写入以下内容

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:_index: shop_type: _doc_id: idupsert: truesql: "SELECT a.id,a.name,a.tags,CONCAT(a.latitude,',',a.longitude) AS location,a.remark_score,a.price_per_man,a.category_id,b
.`name` as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flag FROM shop a
INNER JOIN category b on a.category_id =b.id INNER JOIN seller c on c.id = a.seller_id"commitBatch: 3000

4.4. 启动adapter
cd /app/canal/canal.adapter
bin/startup.sh
4.5. 查看adapter日志
tail -100f  /app/canal/canal.adapter/logs/adapter/adapter.log
4.6. 异常方案

看到有以下报错:

2021-11-21 18:11:29.223 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2021-11-21 18:11:29.223 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2021-11-21 18:11:29.224 [main] INFO  c.a.otter.canal.client.adapter.support.ExtensionLoader - extension classpath dir: /usr/local/canal_adapter/plugin
2021-11-21 18:11:29.245 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2021-11-21 18:11:29.441 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ...
2021-11-21 18:11:29.493 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2021-11-21 18:11:29.747 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSourceat com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [client-adapter.launcher-1.1.5.jar:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_292]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_292]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_292]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_292]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1694) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:579) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:501) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:353) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:390) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:184) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1089) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.cloud.context.scope.refresh.RefreshScope.eagerlyInitialize(RefreshScope.java:126) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at org.springframework.cloud.context.scope.refresh.RefreshScope.start(RefreshScope.java:117) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_292]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_292]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_292]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_292]at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:264) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:182) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:144) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:400) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:354) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:888) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[client-adapter.launcher-1.1.5.jar:na]
Caused by: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSourceat com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:83) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:52) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]... 42 common frames omitted
Caused by: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSourceat com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.addSyncConfigToCache(ESAdapter.java:146) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:75) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]... 43 common frames omitted
2021-11-21 18:11:29.753 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /usr/local/canal_adapter/plugin
2021-11-21 18:11:29.772 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
2021-11-21 18:11:29.772 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2021-11-21 18:11:29.777 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2021-11-21 18:11:29.781 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2021-11-21 18:11:29.785 [Thread-3] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2021-11-21 18:11:29.907 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2021-11-21 18:11:29.909 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 3.1 seconds (JVM running for 3.499)
2021-11-21 18:11:30.045 [Thread-3] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============

在百度上查了一下,找到了这篇文章canal本地运行异常:class com.alibaba.druid.pool.DruidDataSource cannot be cast to

确认是jar包冲突导致的bug,于是下载了源码压缩包canal-canal-1.1.5.zip 下载地址 https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.zip
解压缩后用自己的开发工具打开项目,修改maven依赖

       <dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><scope>provided</scope></dependency>


项目根目录下执行mvn clean package,打包,然后到canal-canal-1.1.5\client-adapter\es7x\target\目录下找到client-adapter.es7x-1.1.5-jar-with-dependencies.jar

将这个文件复制到/app/canal/canal_adapter/plugin/下,重启adapter,查看日志

/app/canal/canal_adapter/bin/restart.sh
tail -100f  /app/canal/canal_adapter/logs/adapter/adapter.log

成功启动,不再出现报错。

五、中间件启动
5.1. 启动es
/app/elasticsearch-7.15.2bin/elasticsearch -d && tail -f logs/dianping-app.log

5.2. 启动kibana

如果没下载,可以参考:kibana-7.15.2 一分钟下载、安装、部署 linux,如不想安装直接跳过,也可以验证的,不影响效果

cd /app/kibana-7.15.2-linux-x86_64/
bin/kibana
5.3. 启动canal.adapter
cd /app/canal/canal.adapter
bin/startup.sh
六、全量同步(demo)

在Elasticsearch中创建索引,和MySQL中的student表相对应,可以直接使用postman或者curl命令,也可以安装kibana或者其它Elasticsearch管理插件

6.1. 创建索引
  • 第一种:curl命令
curl --location --request PUT 'http://192.168.159.134:9200/student' \
--header 'Content-Type: application/json' \
--data '{"mappings":{"properties": {"name": {"type": "text"},"grade":{"type": "short"},"class":{"type": "short"},"birthday": {"type": "date","format": "yyyy-MM-dd HH:mm:ss||date_time_no_millis||strict_date_optional_time||epoch_millis"}}}}'

执行结果显示创建成功。

[root@localhost canal_adapter]# curl --location --request PUT 'http://192.168.159.134:9200/student' \
> --header 'Content-Type: application/json' \
> --data '{"mappings":{"properties": {"name": {"type": "text"},"grade":{"type": "short"},"class":{"type": "short"},"birthday": {"type": "date","format": "yyyy-MM-dd HH:mm:ss||date_time_no_millis||strict_date_optional_time||epoch_millis"}}}}'{"acknowledged":true,"shards_acknowledged":true,"index":"student"}
[root@localhost canal_adapter]#
6.2. 执行全量同步
curl -X POST  http://192.168.159.134:8081/etl/es7/student.yml


返回结果:{"succeeded":true,"resultMessage":"导入ES 数据:5 条"}

七、全量同步(企业)
7.1. curl命令创建索引
curl --location --request PUT 'http://localhost:9200/shop' \
--header 'Content-Type: application/json' \
--data '{"settings": {"number_of_shards": 1,"number_of_replicas": 1}, "mappings": {"properties": {"id":{"type": "integer"},"name":{"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"tags":{"type": "text","analyzer": "whitespace","fielddata": true},"location":{"type": "geo_point"},"remark_score":{"type": "double"},"price_per_man":{"type": "integer"},"category_id":{"type": "integer"},"category_name":{"type": "keyword"},"seller_id":{"type": "integer"},"seller_remark_score":{"type": "double"},"seller_disabled_flag":{"type": "integer"}}}
}';


执行结果显示创建成功。

{"acknowledged":true,"shards_acknowledged":true,"index":"shop"}

下面给大家介绍kibana方式,kibana只是提供了一个图形化的页面控制台。

7.2. kibana创建索引

使用 kibana-7.15.2控制台

# 定义门店索引结构
PUT /shop
{"settings": {"number_of_shards": 1,"number_of_replicas": 1}, "mappings": {"properties": {"id":{"type": "integer"},"name":{"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"tags":{"type": "text","analyzer": "whitespace","fielddata": true},"location":{"type": "geo_point"},"remark_score":{"type": "double"},"price_per_man":{"type": "integer"},"category_id":{"type": "integer"},"category_name":{"type": "keyword"},"seller_id":{"type": "integer"},"seller_remark_score":{"type": "double"},"seller_disabled_flag":{"type": "integer"}}}
}
7.3. 执行全量同步url
curl -X POST  http://192.168.159.134:8081/etl/es7/shop.yml


返回结果:{"succeeded":true,"resultMessage":"导入ES 数据:16 条"}

7.4 使用kibana控制台查看全量同步的数据,不是必须的,只是看一下全量同步数据,可以跳过。

7.4. kibana查询数据

使用kibana控制台查看全量同步的数据

# 查询student索引
GET /student/_search# 查询shop索引
GET /shop/_search# 删除student索引
DELETE /student# 删除shop索引
DELETE  /shop


7.5. canal 增量同步

使用canal实现MySQL 8 增量同步数据到 ElasticSearch 7.15.2中

使用canal实现MySQL 8 全量同步数据到 ElasticSearch 7.15.2中 linux相关推荐

  1. 使用canal实现MySQL 8 增量同步数据到 ElasticSearch 7.15.2中 linux

    文章目录 一.清空控制台 1. 清空控制台 2. 修改数据 3. 监控数据 4. 数据变化 5. 索引查询 6. 预期性能评估 7. 增量同步分析 二.验证方案 2.1. 把shop索引删除 2.2. ...

  2. Mysql同步数据到Elasticsearch(实时Canal)

    这里只是作为一个想法,Canal有监听binlog文件的功能.所以简单看了一下Canal的入门使用. 后续Canal实时数据同步的功能希望不会被我阉割......当然有大佬已经实现或者有其他方法实现m ...

  3. Elasticsearch 的全量同步和增量同步

    (1)全量同步 什么是全量同步:将一个mysql的整个表的所有数据都同步到es中 常用插件是logstash-input-jdbc,logstash通过sql语句分区间对数据进行查询,然后输出到es进 ...

  4. 数据同步之全量同步与增量同步

    一.什么是数据同步 业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计. 为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库 ...

  5. MySQL主从恢复(全量恢复数据)

    前言 当mysql主从(一主一从模式)数据不同步,常规方式解决不掉,故全量恢复数据并同步数据. 发现问题 首先可以由mstaer status观察到主从已经未同步,其次slave status看到sl ...

  6. MongoDB 3.4 复制集全量同步改进

    3.2版本复制集同步的过程参考MongoDB 复制集同步原理解析 在 3.4 版本里 MongoDB 对复制集同步的全量同步阶段做了2个改进 在拷贝数据的时候同时建立所有的索引,在之前的版本里,拷贝数 ...

  7. Redis持久化机制 -全量同步与增量同步的区别

    全量同步与增量同步的区别 全量同步:就是每天定时(避开高峰期)或者采用一个周期实现将数据拷贝到一个地方也就是Rdb存储. 增量同步:比如采用对行为的操作实现对数据的同步,也就是AOF. 全量与增量的比 ...

  8. xtrabackup 实现mysql的全量备份与增量备份

    Percona XtraBackup是世界上唯一一款开源的免费MySQL热备份软件,可以为InnoDB和XtraDB数据库执行非阻塞备份. 使用Percona XtraBackup,可以获得以下好处: ...

  9. MySQL数据库全量、增量备份与恢复

    MySQL数据库全量.增量备份与恢复 数据库备份的重要性 在生产的环境中,数据的安全性是至关重要的,任何数据的丢失都可能产生严重的后果. 造成数据丢失的原因 程序错误 人为商店 计算机失败 磁盘失败 ...

  10. 331全量增量数据、同步ld

    -- 数据同步 1 全量数据同步 1.1 不带参数的实现方式 每次更新目标表的时候,先把目标表中的数据清空,然后用源表的数据插入目标表中 . 1.2 通过参数 ,会计期(一个会计期 = 1个月 ,格式 ...

最新文章

  1. 《动手学深度学习》中文第二版预览版发布
  2. mac os系统使用Visual Studio Code打开浏览器查看HTML文件
  3. python3-datetime 时间处理
  4. 中艺人脸识别考勤机使用方法_人脸识别考勤机的使用方法及注意事项 - 全文
  5. mysql linux c tar_linux下mysql的tar包离线安装
  6. php cors和jsonp,jsonp和CORS跨域实现
  7. pythonlocust使用方法_python locust 性能测试:locust安装和一些参数介绍
  8. Sql Create Function简单例子
  9. 4204. 构造矩阵
  10. Skype for Business Server 2015-01-基础环境-准备
  11. Linux重定向和管道的基础学习
  12. Android 应用资源(一)
  13. windows权限提升——烂土豆+dll劫持+引号路径+服务权限
  14. NPN和PNP的使用总结
  15. OpenCv图像处理实战——银行卡卡号识别
  16. 挂载命令 mount
  17. (Paper)Network in Network网络分析
  18. 许一世情 陪你 浪尽天涯
  19. 数字图像处理——拉普拉斯算子【像素级别处理】(python)
  20. 由<高十>纪录片想到的

热门文章

  1. word文档可以压缩大小吗,详细压缩步骤
  2. tensorflow下手写汉字识别及其可视化
  3. PostgreSQL Array 数组类型与 FreeSql 打出一套【组合拳】
  4. 这里带你了解IR2104驱动电路
  5. 【已解决】双系统启动选择的界面(GRUB)字体太小,Ubuntu登录用户界面字体太小
  6. linux 打开8000端口,Linux中如何开启8080端口供外界访问和开启允许对外访问的端口8000...
  7. Python摄氏度和华氏度的转换
  8. 枚举类(Enumerated types)介绍
  9. 如何用视频转换器把qlv格式转换mp4
  10. vue 播放m3u8视频