linux中ELK数据同步

  • 前言
  • ElasticSearch安装
    • 下载安装包&&添加es用户
    • 启动
  • ElasticSearch HEAD安装
    • 下载&&安装
    • 配置
    • 设置跨域
    • 启动ElasticSearch HEAD
  • Kibana安装
    • 下载&&安装
    • 配置
  • canal安装
    • 介绍
    • mysql配置
    • 下载&&安装
    • deployer测试
    • 搭建SpringBoot框架&&引入依赖
    • ClientSample代码
    • adapter测试
  • 问题
    • 索引找不到
    • 找不到es类型
    • 多表SQL关联异常

前言

在之前有说过在windows中安装elasticsearch和kibana 但是没有过多的介绍linux中使用 现在我想大部分的公司所使用的服务器都是linux,当然也不会排除少数的windows系统 刚好我这边之前在阿里云购买了一台linux服务器 在大数据时代 elasticsearch友好的帮助了查询上的优化 但是如果说我想要把mysql中的数据同步实现在elasticsearch中 那么可以用到一些工具来实现 比如elk中的logstation以及canal等。今天我将用canal来对mysql同步es的实现,所以我这边将不会介绍logstation的安装以及使用 将使用canal来替代。有想去了解的可以去其他地方查阅资料。

ElasticSearch安装

下载安装包&&添加es用户

首先下载很简单,直接步入官网选择系统即可,链接如下

https://www.elastic.co/cn/products/elasticsearch

下载后直接找一个文件夹下放入解压即可,我下载的是8.1.2版本的,解压指令我放入下面,下载好后添加一个es用户 指令也在下面,
注意:后面发现版本太高对JDK的有一定的要求,8以上的版本仅支持JDK17 故所有请尽量选择低版本es,我这里有6.6.0版本的链接,此链接支持1.8版本 最好使用此版本,步骤一致 只是我下面介绍的版本不同
请在下面的kibana也使用6.6.0版本!!!

https://www.elastic.co/cn/downloads/past-releases/elasticsearch-6-6-0
//cd到指定目录下解压
tar -zxvf elasticsearch-8.1.2-linux-x86_64.tar.gz

//添加es用户
useradd esroot
//设置密码
passwd esuser

上面的操作完成后说明你的准备工作已完成,下面进入解压后的config文件夹中 因为有两个地方需要修改一下

cd /java/es/elasticsearch-8.1.2/config

//elasticsearch.yml需要修改的地方
node.name: node-1 #配置当前es节点名称(默认是被注释的,并且默认有一个节点名)
cluster.name: my-   application #默认是被注释的,并且默认有一个集群名
path.data: /home/es/data # 数据目录位置
path.logs: /home/es/logs # 日志目录位置
network.host: 0.0.0.0   #绑定的ip:默认只允许本机访问,修改为0.0.0.0后则可以远程访问
cluster.initial_master_nodes: ["node-1","node-2"] #默认是被注释的 设置master节点列表 用逗号分隔

Elasticsearch基于Lucene的,而Lucene底层是java实现,因此我们需要配置jvm参数。编辑jvm.options
修改配置为

修改/etc/security/limits.conf文件 增加配置
vi /etc/security/limits.conf
在文件最后,增加如下配置:

* soft nofile 65536
* hard nofile 65536


wq!退出
在/etc/sysctl.conf文件最后添加一行 vm.max_map_count=655360 添加完毕之后,执行命令: sysctl -p

启动

上面这些完成后基本上就完成了es的安装。下面就看启动上了。既然是新用户,那么我们肯定要授权 将es文件夹下的所有目录的所有权限迭代给esuser用户

chgrp -R esroot ./es
chown -R esroot ./es
chmod 777 es

进入es文件夹下执行./bin/elasticsearch即可启动了,后台启动在执行后面加一个 -d即可也就是./bin/elasticsearch -d
注意:elasticSearch对系统配置有一定要求 我之前的1核2G已趟过浑水 启动后直接性能爆满 所以系统至少要在2核8G以上在尝试

后面我将服务器调至上面的2核8G的勉强还是可以运行起来的,我版本后面是更换至了7.6.1版本 因为我服务器的JDK是1.8的,所以只能是使用8以下的版本,访问成功!

ElasticSearch HEAD安装

下载&&安装

地址:

https://github.com/mobz/elasticsearch-head

解压

unzip elasticsearch-head.zip

配置

进入head文件下执行:

npm install

如果是执行报错在后面加一个-g即可

设置跨域

修改es的elasticsearch.yml文件

//进入查看文件
vim /java/elk/elasticsearch-7.6.1/config/elasticsearch.yml//修改配置
#配置开启跨域
http.cors.enabled: true
#配置允许任何域名访问
http.cors.allow-origin: "*"

修改完es配置后需要重启es,切换es账号输入指令ps -ef|grep elasticsearch查看es的端口号

然后直接kill掉即可 执行./bin/elasticsearch -d指令启动es即可

启动ElasticSearch HEAD

在后台启动指令如下

nohup npm run start &

启动后可查看9100端口号

到这里可视化工具就安装好了 直接访问试试

注意:连接那里填入linux的ip,不然是连接本地

Kibana安装

下载&&安装

上面安装的elasticsearch也是7.6.1版本 所以kibana也是此版本 版本最好保持一致
下载地址:

https://www.elastic.co/cn/downloads/past-releases/kibana-7-6-1

解压

tar -zxvf kibana-7.6.1-linux-x86_64.tar.gz

修改以下三个地方

#kibana端口号
server.port: 5601
//远程连接要这个 不然远程连接不了
server.host: "0.0.0.0"
//ip
elasticsearch.hosts: ["http://这里写你的ip:9200"]

之前创建了一个esroot的用户 现在也要用这个用户来启动 如果非要root启动就要用以下指令

./kibana --allow-root

设置权限并启动

#文件权限
chown -R esroot /java/elk/kibana
# 转用户
su esroot
#在kibana的bin下执行指令(后台登陆 不后台登陆就不要&即可)
./bin/kibana &

启动后连接试试:

http://你的ip地址:5601/app/kibana

配置

canal安装

介绍

译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

mysql配置

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:阿里云RDS for MySQL,默认打开了binlog,并且默认有binlog dump权限,没有任何权限binlog设置,可以跳过这一步

授权渠道 链接 MySQL 账户 作为 MySQL slave 的权限,如果现有账户可以直接授予

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

重启mysql
一、 启动
使用 service 启动:service mysql start
二、停止

使用 service 启动:service mysql stop

三、重启
使用 service 启动:service mysql restart

下载&&安装

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
根据官方文档,目标系统支持MySQL、kafka、elasticsearch、hbase、rocketMQ、pulsar等。
下载路径:

https://github.com/alibaba/canal/releases


下载canal服务端(canal.deployer-1.1.4.tar.gz)和客户端(canal.adapter-1.1.4.tar.gz)

deployer测试

修改/deployer/conf/example目录下的instance.properties文件以下内容即可

然后进入bin目录下执行startup.sh即可开启

搭建SpringBoot框架&&引入依赖


引入依赖

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>

ClientSample代码

ClientSample代码:https://github.com/alibaba/canal/wiki/ClientExample
上面链接是阿里巴巴给的源代码,也就是下面的代码 记得修改ip为adapter的

package com.example.canal.test;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/*** @author: Fujii* @date: 4/13/2022 8:19 PM* @description:*/
public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("这里填入adapter的ip",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}}

然后直接运行main,运行后我在一个测试表中添加一条数据试试

可在日志处观察到变更信息,标志着deployer监听mysql binlog变更成功

上面是deployer的效果 下面我们再来看看adapter

adapter测试

canal adapter 的 Elastic Search 版本支持6.x.x以上,adapter的目录和deployer很相似,主要需要修改的地方也是application.yml文件 yml文件对格式的要求很高 有一点偏差都会出现错误,所以我这边直接远程下载到本地使用notepad工具来编写 当然有能力的小伙伴也可以使用下面的指令进行编写,看自己的习惯吧,我个人比较喜欢下载到本地

vi /java/canal/adapter/conf/application.yml

下面是官方文档地址,也就是我下面这段yml文件。可以参考如下:

https://github.com/alibaba/canal/wiki/Sync-ES
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp # 客户端的模式,可选tcp kafka rocketMQflatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效zookeeperHosts:    # 对应集群模式下的zk地址syncBatchSize: 1000 # 每次同步的批数量retries: 0 # 重试次数, -1为无限重试timeout: # 同步超时时间, 单位毫秒accessKey:secretKey:consumerProperties:canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources:defaultDS:url: jdbc:mysql://你的ip地址:3306/campus?useUnicode=true&charaterEncoding=utf-8&useSSL=falseusername: canalpassword: canalcanalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger- name: es6hosts: 你的ip地址:9200 # 127.0.0.1:9200 for rest modeproperties:mode: rest # or restsecurity.auth: elastic:elastic #  only used for rest modecluster.name: name #es名称

adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件,不需要的文件可删除,只配置需要的yml文件。修改如下配置,esMapping信息,包括 index,type,sql,其中sql尽量保证不要换行,在文本编辑器中编辑成一行,在粘贴进去,否则可能会出问题(还是由于yml格式问题)
此处的sql的字段别名即是es字段名,不写别名,默认原名即是es字段名,有关详细说明,可参见官方文档

https://github.com/alibaba/canal/wiki/Sync-ES

etlCondition 可以注释掉,我们默认任何条件都同步

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:_index: student # es 的索引名称_id: _id  # 将Mysql表里的id对应上es上的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配_type: _docupsert: truesql: "SELECTid AS _id,name,ageFROMstudent"        # sql映射#etlCondition: "where p.id>={}"   #etl的条件参数commitBatch: 3000   # 提交批大小

编辑好yml文件后直接在文件夹下执行以下指令

bin/startup.sh

启动后如下图说明启动成功!

启动后可在mysql尝试插入数据 在adapter.log文件中查看到监听信息,下面先来在数据库添加一条数据试试


下面打开adapter.log的实时日志监控可以看到有信息输出出来,这里是有两条 第一条是监听读取mysql的数据 第二条是添加进入es的信息
查看es 我这里用的head可视化工具 所以可以直接查看

问题

索引找不到

ESSyncService - sync error, es index: mytest_user

1.可能是es还没创建该索引
2.es版本号太高,目前canal支持7.以下的版本 所以最好是6.的版本

找不到es类型

ERROR c.a.o.c.client.adapter.es.core.monitor.ESConfigMonitor - esMapping._type

1.加上_type: _doc

目前我在配置中是遇到这两个问题,还是比较顺利吧 在版本号这里花了不少时间 后面在网上看了些资料才知道的 好了 这期就到这

多表SQL关联异常

java.lang.RuntimeException: java.lang.RuntimeException: com.alibaba.fastsql.sql.parser.ParserException

解决方案:关联的id加上即可

结束语
若要前行,就要离开你现在停留的地方!

canal实现mysql同步Elasticsearch数据linux中安装ELK相关推荐

  1. linux mysql授权外部访问权限,Linux中安装Mysql授权远程访问

    Linux中安装MySQL 因为使用yum安装.安装过程需保证网络通畅 一.安装mysql 1.yum安装mysqlCentOS7默认数据库是mariadb,配置等用着不习惯,因此决定改成mysql, ...

  2. 使用canal解决Mysql和Redis数据同步问题

    前言 千呼万唤始出来,停了好个月,终于又开始动手写文章了,今天带给大家的是阿里的一个工具Canal,这个工具是企业做数据同步使用的比较多的方案,希望对你有所帮助,喜欢的话请给个好评 工作原理分析 我们 ...

  3. mysql 分词搜索_实战 | canal 实现Mysql到Elasticsearch实时增量同步

    题记 关系型数据库Mysql/Oracle增量同步Elasticsearch是持续关注的问题,也是社区.QQ群等讨论最多的问题之一. 问题包含但不限于: 1.Mysql如何同步到Elasticsear ...

  4. 增量同步_实战 | canal 实现Mysql到Elasticsearch实时增量同步

    题记 关系型数据库Mysql/Oracle增量同步Elasticsearch是持续关注的问题,也是社区.QQ群等讨论最多的问题之一. 问题包含但不限于: 1.Mysql如何同步到Elasticsear ...

  5. Canal 实现 Mysql数据库实时数据同步

    简介 1.1 canal介绍 Canal是一个基于MySQL二进制日志的高性能数据同步系统.Canal广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增量数 ...

  6. 用navicat访问linux数据库,成功实现Navicat访问Linux中安装的MySQL数据库

    我们在Linux中安装完数据库(MySQL)后,使用navicat去进行远程连接时会报以下错误 这主要是因为我们没有给权限,所以外部主机没有权限访问linux中的MySQL 我们只需要登陆linux中 ...

  7. 怎么在linux卸载mysql,在linux中安装和卸载mysql

    [安装] 已经获取到linux版本的mysql安装包,包括mysql的server(服务端)和client(客户端)的安装包,假设安装包为: MySQL-server-5.0.22-0.i386.rp ...

  8. hive安装需要安装mysql区别_HIVE安装系列之一:在Linux中安装mysql,为其作为hive的metastore做准备...

    安装mysql的Linux机器是Centos6的系统,机器名字叫combanc05 mysql我采用的是5.5版本. 安装过程中需要解决新旧版的冲突问题,并允许mysql被远程访问.以便其作为hive ...

  9. 在Linux中安装Pentaho Server 9.1并使用MySQL作为存储库

    在Linux中安装Pentaho Server 9.1并使用MySQL作为存储库 一.本文环境 应用名称 CentOS Linux Pentaho Server CE MySQL JDK 应用版本 7 ...

最新文章

  1. COALESCE语句解救sql的sum问题
  2. php类属性命名驼峰还是下划线,PHP实现驼峰命名和下划线命名互转
  3. 项目实践精解:ASP.NET应用开发
  4. 查询排序_MySQL使用UNION连接两个查询排序失效
  5. Oracle入门(十四)之PL/SQL
  6. Jenkins时区设置为北京时间
  7. 完整nagios安装最新pnp版绘图-sync模式
  8. java高级流程控制多线程作业设计_Java高级-解析Java中的多线程机制
  9. hadoop环境准备-大数据Week5-DAY6-1-hadoop
  10. 用ghost备份和还原Linux系统(一)
  11. STM32F4开发-新建工程
  12. Java 批量下载图片并压缩为Zip
  13. SQL语句之查询进阶篇---上
  14. 成本最低的Elance提现方式 — Moneybookers
  15. 51.com新版上线 正式推出开放平台
  16. Nosql - redis 的学习
  17. 云宏刘建平:细说中小企业如何上云
  18. PTA 7-2 一帮一
  19. Markdown语法010:脚注
  20. matlab中单独存图_奇怪的Matlab画图技巧系列1–保存高清大图

热门文章

  1. 10 行代码,9 行报错,8 个警告…
  2. 学系统集成项目管理工程师(中项)系列21a_整体管理(上)
  3. 【NumPy 数组过滤、NumPy 中的随机数、NumPy ufuncs】
  4. Python实现王者荣耀小助手(一)
  5. Java项目:图书进销存管理系统(java+SSM+JSP+bootstrap+Mysql)
  6. linux下载python numba,安装numba和使用numba加速python程序
  7. 转:《岳阳楼记》之股市篇
  8. 通达OA v11.3 以下版本 任意文件上传加文件包含导致命令执行漏洞在线实验环境
  9. 计算机专业在职研究生复试要考哪些科目,计算机在职研究生考试科目有哪些
  10. UG编程二次开粗的应用与技巧