canal实现mysql同步Elasticsearch数据linux中安装ELK
linux中ELK数据同步
- 前言
- ElasticSearch安装
- 下载安装包&&添加es用户
- 启动
- ElasticSearch HEAD安装
- 下载&&安装
- 配置
- 设置跨域
- 启动ElasticSearch HEAD
- Kibana安装
- 下载&&安装
- 配置
- canal安装
- 介绍
- mysql配置
- 下载&&安装
- deployer测试
- 搭建SpringBoot框架&&引入依赖
- ClientSample代码
- adapter测试
- 问题
- 索引找不到
- 找不到es类型
- 多表SQL关联异常
前言
在之前有说过在windows中安装elasticsearch和kibana 但是没有过多的介绍linux中使用 现在我想大部分的公司所使用的服务器都是linux,当然也不会排除少数的windows系统 刚好我这边之前在阿里云购买了一台linux服务器 在大数据时代 elasticsearch友好的帮助了查询上的优化 但是如果说我想要把mysql中的数据同步实现在elasticsearch中 那么可以用到一些工具来实现 比如elk中的logstation以及canal等。今天我将用canal来对mysql同步es的实现,所以我这边将不会介绍logstation的安装以及使用 将使用canal来替代。有想去了解的可以去其他地方查阅资料。
ElasticSearch安装
下载安装包&&添加es用户
首先下载很简单,直接步入官网选择系统即可,链接如下
https://www.elastic.co/cn/products/elasticsearch
下载后直接找一个文件夹下放入解压即可,我下载的是8.1.2版本的,解压指令我放入下面,下载好后添加一个es用户 指令也在下面,
注意:后面发现版本太高对JDK的有一定的要求,8以上的版本仅支持JDK17 故所有请尽量选择低版本es,我这里有6.6.0版本的链接,此链接支持1.8版本 最好使用此版本,步骤一致 只是我下面介绍的版本不同
请在下面的kibana也使用6.6.0版本!!!
https://www.elastic.co/cn/downloads/past-releases/elasticsearch-6-6-0
//cd到指定目录下解压
tar -zxvf elasticsearch-8.1.2-linux-x86_64.tar.gz
//添加es用户
useradd esroot
//设置密码
passwd esuser
上面的操作完成后说明你的准备工作已完成,下面进入解压后的config文件夹中 因为有两个地方需要修改一下
cd /java/es/elasticsearch-8.1.2/config
//elasticsearch.yml需要修改的地方
node.name: node-1 #配置当前es节点名称(默认是被注释的,并且默认有一个节点名)
cluster.name: my- application #默认是被注释的,并且默认有一个集群名
path.data: /home/es/data # 数据目录位置
path.logs: /home/es/logs # 日志目录位置
network.host: 0.0.0.0 #绑定的ip:默认只允许本机访问,修改为0.0.0.0后则可以远程访问
cluster.initial_master_nodes: ["node-1","node-2"] #默认是被注释的 设置master节点列表 用逗号分隔
Elasticsearch基于Lucene的,而Lucene底层是java实现,因此我们需要配置jvm参数。编辑jvm.options
修改配置为
修改/etc/security/limits.conf文件 增加配置
vi /etc/security/limits.conf
在文件最后,增加如下配置:
* soft nofile 65536
* hard nofile 65536
wq!退出
在/etc/sysctl.conf文件最后添加一行 vm.max_map_count=655360 添加完毕之后,执行命令: sysctl -p
启动
上面这些完成后基本上就完成了es的安装。下面就看启动上了。既然是新用户,那么我们肯定要授权 将es文件夹下的所有目录的所有权限迭代给esuser用户
chgrp -R esroot ./es
chown -R esroot ./es
chmod 777 es
进入es文件夹下执行./bin/elasticsearch即可启动了,后台启动在执行后面加一个 -d即可也就是./bin/elasticsearch -d
注意:elasticSearch对系统配置有一定要求 我之前的1核2G已趟过浑水 启动后直接性能爆满 所以系统至少要在2核8G以上在尝试
后面我将服务器调至上面的2核8G的勉强还是可以运行起来的,我版本后面是更换至了7.6.1版本 因为我服务器的JDK是1.8的,所以只能是使用8以下的版本,访问成功!
ElasticSearch HEAD安装
下载&&安装
地址:
https://github.com/mobz/elasticsearch-head
解压
unzip elasticsearch-head.zip
配置
进入head文件下执行:
npm install
如果是执行报错在后面加一个-g即可
设置跨域
修改es的elasticsearch.yml文件
//进入查看文件
vim /java/elk/elasticsearch-7.6.1/config/elasticsearch.yml//修改配置
#配置开启跨域
http.cors.enabled: true
#配置允许任何域名访问
http.cors.allow-origin: "*"
修改完es配置后需要重启es,切换es账号输入指令ps -ef|grep elasticsearch查看es的端口号
然后直接kill掉即可 执行./bin/elasticsearch -d指令启动es即可
启动ElasticSearch HEAD
在后台启动指令如下
nohup npm run start &
启动后可查看9100端口号
到这里可视化工具就安装好了 直接访问试试
注意:连接那里填入linux的ip,不然是连接本地
Kibana安装
下载&&安装
上面安装的elasticsearch也是7.6.1版本 所以kibana也是此版本 版本最好保持一致
下载地址:
https://www.elastic.co/cn/downloads/past-releases/kibana-7-6-1
解压
tar -zxvf kibana-7.6.1-linux-x86_64.tar.gz
修改以下三个地方
#kibana端口号
server.port: 5601
//远程连接要这个 不然远程连接不了
server.host: "0.0.0.0"
//ip
elasticsearch.hosts: ["http://这里写你的ip:9200"]
之前创建了一个esroot的用户 现在也要用这个用户来启动 如果非要root启动就要用以下指令
./kibana --allow-root
设置权限并启动
#文件权限
chown -R esroot /java/elk/kibana
# 转用户
su esroot
#在kibana的bin下执行指令(后台登陆 不后台登陆就不要&即可)
./bin/kibana &
启动后连接试试:
http://你的ip地址:5601/app/kibana
配置
canal安装
介绍
译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
mysql配置
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
注意:阿里云RDS for MySQL,默认打开了binlog,并且默认有binlog dump权限,没有任何权限binlog设置,可以跳过这一步
授权渠道 链接 MySQL 账户 作为 MySQL slave 的权限,如果现有账户可以直接授予
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
重启mysql
一、 启动
使用 service 启动:service mysql start
二、停止
使用 service 启动:service mysql stop
三、重启
使用 service 启动:service mysql restart
下载&&安装
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
根据官方文档,目标系统支持MySQL、kafka、elasticsearch、hbase、rocketMQ、pulsar等。
下载路径:
https://github.com/alibaba/canal/releases
下载canal服务端(canal.deployer-1.1.4.tar.gz)和客户端(canal.adapter-1.1.4.tar.gz)
deployer测试
修改/deployer/conf/example目录下的instance.properties文件以下内容即可
然后进入bin目录下执行startup.sh即可开启
搭建SpringBoot框架&&引入依赖
引入依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
ClientSample代码
ClientSample代码:https://github.com/alibaba/canal/wiki/ClientExample
上面链接是阿里巴巴给的源代码,也就是下面的代码 记得修改ip为adapter的
package com.example.canal.test;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/*** @author: Fujii* @date: 4/13/2022 8:19 PM* @description:*/
public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("这里填入adapter的ip",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}}
然后直接运行main,运行后我在一个测试表中添加一条数据试试
可在日志处观察到变更信息,标志着deployer监听mysql binlog变更成功
上面是deployer的效果 下面我们再来看看adapter
adapter测试
canal adapter 的 Elastic Search 版本支持6.x.x以上,adapter的目录和deployer很相似,主要需要修改的地方也是application.yml文件 yml文件对格式的要求很高 有一点偏差都会出现错误,所以我这边直接远程下载到本地使用notepad工具来编写 当然有能力的小伙伴也可以使用下面的指令进行编写,看自己的习惯吧,我个人比较喜欢下载到本地
vi /java/canal/adapter/conf/application.yml
下面是官方文档地址,也就是我下面这段yml文件。可以参考如下:
https://github.com/alibaba/canal/wiki/Sync-ES
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp # 客户端的模式,可选tcp kafka rocketMQflatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效zookeeperHosts: # 对应集群模式下的zk地址syncBatchSize: 1000 # 每次同步的批数量retries: 0 # 重试次数, -1为无限重试timeout: # 同步超时时间, 单位毫秒accessKey:secretKey:consumerProperties:canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources:defaultDS:url: jdbc:mysql://你的ip地址:3306/campus?useUnicode=true&charaterEncoding=utf-8&useSSL=falseusername: canalpassword: canalcanalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger- name: es6hosts: 你的ip地址:9200 # 127.0.0.1:9200 for rest modeproperties:mode: rest # or restsecurity.auth: elastic:elastic # only used for rest modecluster.name: name #es名称
adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件,不需要的文件可删除,只配置需要的yml文件。修改如下配置,esMapping信息,包括 index,type,sql,其中sql尽量保证不要换行,在文本编辑器中编辑成一行,在粘贴进去,否则可能会出问题(还是由于yml格式问题)
此处的sql的字段别名即是es字段名,不写别名,默认原名即是es字段名,有关详细说明,可参见官方文档
https://github.com/alibaba/canal/wiki/Sync-ES
etlCondition 可以注释掉,我们默认任何条件都同步
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:_index: student # es 的索引名称_id: _id # 将Mysql表里的id对应上es上的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配_type: _docupsert: truesql: "SELECTid AS _id,name,ageFROMstudent" # sql映射#etlCondition: "where p.id>={}" #etl的条件参数commitBatch: 3000 # 提交批大小
编辑好yml文件后直接在文件夹下执行以下指令
bin/startup.sh
启动后如下图说明启动成功!
启动后可在mysql尝试插入数据 在adapter.log文件中查看到监听信息,下面先来在数据库添加一条数据试试
下面打开adapter.log的实时日志监控可以看到有信息输出出来,这里是有两条 第一条是监听读取mysql的数据 第二条是添加进入es的信息
查看es 我这里用的head可视化工具 所以可以直接查看
问题
索引找不到
ESSyncService - sync error, es index: mytest_user
1.可能是es还没创建该索引
2.es版本号太高,目前canal支持7.以下的版本 所以最好是6.的版本
找不到es类型
ERROR c.a.o.c.client.adapter.es.core.monitor.ESConfigMonitor - esMapping._type
1.加上_type: _doc
目前我在配置中是遇到这两个问题,还是比较顺利吧 在版本号这里花了不少时间 后面在网上看了些资料才知道的 好了 这期就到这
多表SQL关联异常
java.lang.RuntimeException: java.lang.RuntimeException: com.alibaba.fastsql.sql.parser.ParserException
解决方案:关联的id加上即可
结束语
若要前行,就要离开你现在停留的地方!
canal实现mysql同步Elasticsearch数据linux中安装ELK相关推荐
- linux mysql授权外部访问权限,Linux中安装Mysql授权远程访问
Linux中安装MySQL 因为使用yum安装.安装过程需保证网络通畅 一.安装mysql 1.yum安装mysqlCentOS7默认数据库是mariadb,配置等用着不习惯,因此决定改成mysql, ...
- 使用canal解决Mysql和Redis数据同步问题
前言 千呼万唤始出来,停了好个月,终于又开始动手写文章了,今天带给大家的是阿里的一个工具Canal,这个工具是企业做数据同步使用的比较多的方案,希望对你有所帮助,喜欢的话请给个好评 工作原理分析 我们 ...
- mysql 分词搜索_实战 | canal 实现Mysql到Elasticsearch实时增量同步
题记 关系型数据库Mysql/Oracle增量同步Elasticsearch是持续关注的问题,也是社区.QQ群等讨论最多的问题之一. 问题包含但不限于: 1.Mysql如何同步到Elasticsear ...
- 增量同步_实战 | canal 实现Mysql到Elasticsearch实时增量同步
题记 关系型数据库Mysql/Oracle增量同步Elasticsearch是持续关注的问题,也是社区.QQ群等讨论最多的问题之一. 问题包含但不限于: 1.Mysql如何同步到Elasticsear ...
- Canal 实现 Mysql数据库实时数据同步
简介 1.1 canal介绍 Canal是一个基于MySQL二进制日志的高性能数据同步系统.Canal广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增量数 ...
- 用navicat访问linux数据库,成功实现Navicat访问Linux中安装的MySQL数据库
我们在Linux中安装完数据库(MySQL)后,使用navicat去进行远程连接时会报以下错误 这主要是因为我们没有给权限,所以外部主机没有权限访问linux中的MySQL 我们只需要登陆linux中 ...
- 怎么在linux卸载mysql,在linux中安装和卸载mysql
[安装] 已经获取到linux版本的mysql安装包,包括mysql的server(服务端)和client(客户端)的安装包,假设安装包为: MySQL-server-5.0.22-0.i386.rp ...
- hive安装需要安装mysql区别_HIVE安装系列之一:在Linux中安装mysql,为其作为hive的metastore做准备...
安装mysql的Linux机器是Centos6的系统,机器名字叫combanc05 mysql我采用的是5.5版本. 安装过程中需要解决新旧版的冲突问题,并允许mysql被远程访问.以便其作为hive ...
- 在Linux中安装Pentaho Server 9.1并使用MySQL作为存储库
在Linux中安装Pentaho Server 9.1并使用MySQL作为存储库 一.本文环境 应用名称 CentOS Linux Pentaho Server CE MySQL JDK 应用版本 7 ...
最新文章
- COALESCE语句解救sql的sum问题
- php类属性命名驼峰还是下划线,PHP实现驼峰命名和下划线命名互转
- 项目实践精解:ASP.NET应用开发
- 查询排序_MySQL使用UNION连接两个查询排序失效
- Oracle入门(十四)之PL/SQL
- Jenkins时区设置为北京时间
- 完整nagios安装最新pnp版绘图-sync模式
- java高级流程控制多线程作业设计_Java高级-解析Java中的多线程机制
- hadoop环境准备-大数据Week5-DAY6-1-hadoop
- 用ghost备份和还原Linux系统(一)
- STM32F4开发-新建工程
- Java 批量下载图片并压缩为Zip
- SQL语句之查询进阶篇---上
- 成本最低的Elance提现方式 — Moneybookers
- 51.com新版上线 正式推出开放平台
- Nosql - redis 的学习
- 云宏刘建平:细说中小企业如何上云
- PTA 7-2 一帮一
- Markdown语法010:脚注
- matlab中单独存图_奇怪的Matlab画图技巧系列1–保存高清大图
热门文章
- 10 行代码,9 行报错,8 个警告…
- 学系统集成项目管理工程师(中项)系列21a_整体管理(上)
- 【NumPy 数组过滤、NumPy 中的随机数、NumPy ufuncs】
- Python实现王者荣耀小助手(一)
- Java项目:图书进销存管理系统(java+SSM+JSP+bootstrap+Mysql)
- linux下载python numba,安装numba和使用numba加速python程序
- 转:《岳阳楼记》之股市篇
- 通达OA v11.3 以下版本 任意文件上传加文件包含导致命令执行漏洞在线实验环境
- 计算机专业在职研究生复试要考哪些科目,计算机在职研究生考试科目有哪些
- UG编程二次开粗的应用与技巧