使用Logstash将MySQL同步到Elasticsearch
一、环境准备
搭建es和kibana,可以参考我前面写的文章
Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。
二、环境搭建
将logstash安装包上传到Linux服务器下,安装logstash-input-jdbc和logstash-output-elasticsearch两个插件。
具体步骤如下:
### 上传logstash的安装包到/usr/local目录下
### 解压
tar -zxvf logstash-6.4.3.tar.gz
cd logstash-6.4.3
### 安装插件
bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-output-elasticsearch
### 在/usr/local目录下新建sql文件夹
mkdir /usr/local/sql
### 将mysql驱动jar包上传到该文件夹下
在SQL目录下新建文件
vi mysql_user.conf
input {jdbc {jdbc_driver_library => "/usr/local/sql/mysql-connector-java-5.1.46.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.1.102:3306/test"jdbc_user => "root"jdbc_password => "123456"schedule => "* * * * *"statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"use_column_value => truetracking_column_type => "timestamp"tracking_column => "update_time"last_run_metadata_path => "syncpoint_table"}
}output {elasticsearch {# ES的IP地址及端口hosts => ["192.168.112.150:9200"]# 索引名称 可自定义index => "user"# 需要关联的数据库中有有一个id字段,对应类型中的iddocument_id => "%{id}"document_type => "user"}stdout {# JSON格式输出codec => json_lines}
}
配置文件说明:
jdbc_driver_library: jdbc mysql 驱动的路径,在上一步中已经下载
jdbc_driver_class: 驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了
jdbc_connection_string: mysql 地址
jdbc_user: mysql 用户
jdbc_password: mysql 密码
schedule: 执行 sql 时机,类似 crontab 的调度
statement: 要执行的 sql,以 “:” 开头是定义的变量,可以通过 parameters 来设置变量,这里的 sql_last_value 是内置的变量,表示上一次 sql 执行中 update_time 的值,这里 update_time 条件是 >= 因为时间有可能相等,没有等号可能会漏掉一些增量
use_column_value: 使用递增列的值
tracking_column_type: 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型
tracking_column: 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
last_run_metadata_path: 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手
sql脚本:
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,`update_time` timestamp(0) NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP(0),PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
启动logstash
cd /usr/local/logstash-6.4.3
./bin/logstash -f /usr/local/sql/mysql_user.conf
在user表中插入一条数据:
logstash打印出了日志:
使用kibana查询:
logstash-input-jdbc原理
使用 logstash-input-jdbc 插件读取 mysql 的数据,这个插件的工作原理比较简单,就是定时执行一个 sql,然后将 sql 执行的结果写入到流中,增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段
多文件方式同步ES数据
一个 logstash 实例可以借助 pipelines 机制同步多个表,只需要写多个配置文件就可以了,假设我们有两个表 user和 student,对应两个配置文件 mysql_user.conf 和 mysql_student.conf,在 config/pipelines.yml 中配置即可。
user表和mysql_user.conf文件如同上文
sql脚本:
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,`update_time` timestamp(0) NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP(0),PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
mysql_student.conf文件
vi /user/local/sql/mysql_student.conf
input {jdbc {jdbc_driver_library => "/usr/local/sql/mysql-connector-java-5.1.46.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.1.102:3306/test"jdbc_user => "root"jdbc_password => "123456"schedule => "* * * * *"statement => "SELECT * FROM student WHERE update_time >= :sql_last_value"use_column_value => truetracking_column_type => "timestamp"tracking_column => "update_time"last_run_metadata_path => "syncpoint_table"}
}output {elasticsearch {# ES的IP地址及端口hosts => ["192.168.112.150:9200"]# 索引名称 可自定义index => "student"# 需要关联的数据库中有有一个id字段,对应类型中的iddocument_id => "%{id}"document_type => "student"}stdout {# JSON格式输出codec => json_lines}
}
修改config/pipelines.yml文件
cd /usr/local/logstash-6.4.3
vi config/pipelines.yml
在最下面加上配置即可
启动logstash
cd /usr/local/logstash-6.4.3
./bin/logstash
启动成功之后,往user表和student表都插入数据
可以看到logstash输出的日志:
使用kibana查看:
使用Logstash将MySQL同步到Elasticsearch相关推荐
- elasticsearch-jdbc实现MySQL同步到ElasticSearch深入详解
1.如何实现mysql与elasticsearch的数据同步? 逐条转换为json显然不合适,需要借助第三方工具或者自己实现.核心功能点:同步增.删.改.查同步. 2.mysql与elasticsea ...
- linux es连接mysql_LINUX下使用elasticsearch-jdbc工具实现MySQL同步到ElasticSearch 以及linux 64位centos系统安装jdk1.8...
第一步:环境匹配 1)elasticsearch 2.3.3 成功安装部署 2)mysql安装成功,增删改查无误~~. 3)要保证elasticsearch-jdbc的版本要与elasticsearc ...
- LINUX下使用elasticsearch-jdbc工具实现MySQL同步到ElasticSearch 以及linux 64位centos系统安装jdk1.8
标签: 第一步:环境匹配 1)elasticsearch 2.3.3 成功安装部署 2)mysql安装成功,增删改查无误~~. 3)要保证elasticsearch-jdbc的版本要与elastic ...
- 通过Logstash实现mysql数据定时增量同步到ES
文章目录 前言 一.系统配置 二.同步步骤整体概览 三.logstash数据同步实战 1.新建mysql表 2.ES中新建索引 3.Logstash 管道配置 4.启动Logstash 5.测试 6. ...
- 使用canal同步MySQL数据到Elasticsearch(ES)
目录 1.功能及使用场景 1.1.功能介绍 1.2.使用场景 2.需求引入 3.canal文件下载及准备 3.1 下载文件 3.2 准备文件 4.deployer安装及效果测试 4.1.deploye ...
- logstash mysql 准实时同步到 elasticsearch
mysql 作为成熟稳定的数据持久化解决方案,广泛地应用在各种领域,但是在数据分析方面稍有不足,而 elasticsearch 作为数据分析领域的佼佼者,刚好可以弥补这项不足,而我们要做的只需要将 m ...
- ElasticSearch + Logstash进行数据库同步
介绍 在我们使用mysql和elasticsearch结合使用的时候,可能会有一些同步的需求,想要数据库和elasticsearch同步的方式其实有很多. 可以使用canal,它主要是监听mysql的 ...
- logstash实现mysql数据库表实时同步
logstash可以将不同数据源,例如日志.文件.或jdbc等,同步到ElasticSearch中,本文利用logstash实现mysql数据库表之间的数据.(实例:数据库DB1中的表A有添加或者修改 ...
- elasticsearch 数据类型_基于 MySQL Binlog 的 Elasticsearch 数据同步实践
来源;马蜂窝 一.背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存 ...
最新文章
- LeetCode简单题之距离顺序排列矩阵单元格
- 通过xml 生成html页面,使用xmldom在服务器端生成静态html页面
- ActionScript 3 作用域内部细节介绍
- 思科设备路由器间IPsec ×××实现私网之间通信实战
- mongdb 群集_通过对比群集分配进行视觉特征的无监督学习
- 拳王虚拟项目公社:2020主流的虚拟资源项目,最新最全自动化系统玩法
- 有机食品海报这样设计,收获了意想不到的效果…
- 《深入理解Nginx》阅读与实践(一):Nginx安装配置与HelloWorld
- ehcache缓存共享(rmi方法)
- Windows开发签名工具(SignTool)下载
- java初学者代码练习题
- 管理_立项任务书怎么写——毛宇菲
- BPSK调制解调matlab仿真
- 交叉编译 WPA_Supplicant
- 一文掌握数仓中auto analyze的使用
- cf. ConneR and the A.R.C. Markland-N
- 0005 前端 Html 04 AutoFileName 图片的显示 文字链接 图片链接 页面内链接 列表 加超链接的列表
- Android Pixel手机Notification小图标显示白方块问题
- shiro安全框架扩展教程--如何扩展realm桥接器并退出自动清空角色资源缓存
- GDUFS 2018信息学院程序设计新手赛(正式赛)题解