一、环境准备

搭建es和kibana,可以参考我前面写的文章

Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

二、环境搭建

将logstash安装包上传到Linux服务器下,安装logstash-input-jdbc和logstash-output-elasticsearch两个插件。
具体步骤如下:

### 上传logstash的安装包到/usr/local目录下
### 解压
tar -zxvf logstash-6.4.3.tar.gz
cd logstash-6.4.3
### 安装插件
bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-output-elasticsearch
### 在/usr/local目录下新建sql文件夹
mkdir /usr/local/sql
### 将mysql驱动jar包上传到该文件夹下

在SQL目录下新建文件
vi mysql_user.conf

input {jdbc {jdbc_driver_library => "/usr/local/sql/mysql-connector-java-5.1.46.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.1.102:3306/test"jdbc_user => "root"jdbc_password => "123456"schedule => "* * * * *"statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"use_column_value => truetracking_column_type => "timestamp"tracking_column => "update_time"last_run_metadata_path => "syncpoint_table"}
}output {elasticsearch {# ES的IP地址及端口hosts => ["192.168.112.150:9200"]# 索引名称 可自定义index => "user"# 需要关联的数据库中有有一个id字段,对应类型中的iddocument_id => "%{id}"document_type => "user"}stdout {# JSON格式输出codec => json_lines}
}

配置文件说明:

jdbc_driver_library: jdbc mysql 驱动的路径,在上一步中已经下载
jdbc_driver_class: 驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了
jdbc_connection_string: mysql 地址
jdbc_user: mysql 用户
jdbc_password: mysql 密码
schedule: 执行 sql 时机,类似 crontab 的调度
statement: 要执行的 sql,以 “:” 开头是定义的变量,可以通过 parameters 来设置变量,这里的 sql_last_value 是内置的变量,表示上一次 sql 执行中 update_time 的值,这里 update_time 条件是 >= 因为时间有可能相等,没有等号可能会漏掉一些增量
use_column_value: 使用递增列的值
tracking_column_type: 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型
tracking_column: 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
last_run_metadata_path: 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手

sql脚本:

DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,`update_time` timestamp(0) NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP(0),PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

启动logstash

cd /usr/local/logstash-6.4.3
./bin/logstash -f /usr/local/sql/mysql_user.conf


在user表中插入一条数据:

logstash打印出了日志:

使用kibana查询:

logstash-input-jdbc原理
使用 logstash-input-jdbc 插件读取 mysql 的数据,这个插件的工作原理比较简单,就是定时执行一个 sql,然后将 sql 执行的结果写入到流中,增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段

多文件方式同步ES数据

一个 logstash 实例可以借助 pipelines 机制同步多个表,只需要写多个配置文件就可以了,假设我们有两个表 user和 student,对应两个配置文件 mysql_user.conf 和 mysql_student.conf,在 config/pipelines.yml 中配置即可。
user表和mysql_user.conf文件如同上文
sql脚本:

DROP TABLE IF EXISTS `student`;
CREATE TABLE `student`  (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,`update_time` timestamp(0) NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP(0),PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

mysql_student.conf文件
vi /user/local/sql/mysql_student.conf

input {jdbc {jdbc_driver_library => "/usr/local/sql/mysql-connector-java-5.1.46.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.1.102:3306/test"jdbc_user => "root"jdbc_password => "123456"schedule => "* * * * *"statement => "SELECT * FROM student WHERE update_time >= :sql_last_value"use_column_value => truetracking_column_type => "timestamp"tracking_column => "update_time"last_run_metadata_path => "syncpoint_table"}
}output {elasticsearch {# ES的IP地址及端口hosts => ["192.168.112.150:9200"]# 索引名称 可自定义index => "student"# 需要关联的数据库中有有一个id字段,对应类型中的iddocument_id => "%{id}"document_type => "student"}stdout {# JSON格式输出codec => json_lines}
}

修改config/pipelines.yml文件
cd /usr/local/logstash-6.4.3
vi config/pipelines.yml
在最下面加上配置即可

启动logstash

cd /usr/local/logstash-6.4.3
./bin/logstash

启动成功之后,往user表和student表都插入数据


可以看到logstash输出的日志:

使用kibana查看:

使用Logstash将MySQL同步到Elasticsearch相关推荐

  1. elasticsearch-jdbc实现MySQL同步到ElasticSearch深入详解

    1.如何实现mysql与elasticsearch的数据同步? 逐条转换为json显然不合适,需要借助第三方工具或者自己实现.核心功能点:同步增.删.改.查同步. 2.mysql与elasticsea ...

  2. linux es连接mysql_LINUX下使用elasticsearch-jdbc工具实现MySQL同步到ElasticSearch 以及linux 64位centos系统安装jdk1.8...

    第一步:环境匹配 1)elasticsearch 2.3.3 成功安装部署 2)mysql安装成功,增删改查无误~~. 3)要保证elasticsearch-jdbc的版本要与elasticsearc ...

  3. LINUX下使用elasticsearch-jdbc工具实现MySQL同步到ElasticSearch 以及linux 64位centos系统安装jdk1.8

    标签: 第一步:环境匹配 1)elasticsearch 2.3.3 成功安装部署  2)mysql安装成功,增删改查无误~~. 3)要保证elasticsearch-jdbc的版本要与elastic ...

  4. 通过Logstash实现mysql数据定时增量同步到ES

    文章目录 前言 一.系统配置 二.同步步骤整体概览 三.logstash数据同步实战 1.新建mysql表 2.ES中新建索引 3.Logstash 管道配置 4.启动Logstash 5.测试 6. ...

  5. 使用canal同步MySQL数据到Elasticsearch(ES)

    目录 1.功能及使用场景 1.1.功能介绍 1.2.使用场景 2.需求引入 3.canal文件下载及准备 3.1 下载文件 3.2 准备文件 4.deployer安装及效果测试 4.1.deploye ...

  6. logstash mysql 准实时同步到 elasticsearch

    mysql 作为成熟稳定的数据持久化解决方案,广泛地应用在各种领域,但是在数据分析方面稍有不足,而 elasticsearch 作为数据分析领域的佼佼者,刚好可以弥补这项不足,而我们要做的只需要将 m ...

  7. ElasticSearch + Logstash进行数据库同步

    介绍 在我们使用mysql和elasticsearch结合使用的时候,可能会有一些同步的需求,想要数据库和elasticsearch同步的方式其实有很多. 可以使用canal,它主要是监听mysql的 ...

  8. logstash实现mysql数据库表实时同步

    logstash可以将不同数据源,例如日志.文件.或jdbc等,同步到ElasticSearch中,本文利用logstash实现mysql数据库表之间的数据.(实例:数据库DB1中的表A有添加或者修改 ...

  9. elasticsearch 数据类型_基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    来源;马蜂窝 一.背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存 ...

最新文章

  1. LeetCode简单题之距离顺序排列矩阵单元格
  2. 通过xml 生成html页面,使用xmldom在服务器端生成静态html页面
  3. ActionScript 3 作用域内部细节介绍
  4. 思科设备路由器间IPsec ×××实现私网之间通信实战
  5. mongdb 群集_通过对比群集分配进行视觉特征的无监督学习
  6. 拳王虚拟项目公社:2020主流的虚拟资源项目,最新最全自动化系统玩法
  7. 有机食品海报这样设计,收获了意想不到的效果…
  8. 《深入理解Nginx》阅读与实践(一):Nginx安装配置与HelloWorld
  9. ehcache缓存共享(rmi方法)
  10. Windows开发签名工具(SignTool)下载
  11. java初学者代码练习题
  12. 管理_立项任务书怎么写——毛宇菲
  13. BPSK调制解调matlab仿真
  14. 交叉编译 WPA_Supplicant
  15. 一文掌握数仓中auto analyze的使用
  16. cf. ConneR and the A.R.C. Markland-N
  17. 0005 前端 Html 04 AutoFileName 图片的显示 文字链接 图片链接 页面内链接 列表 加超链接的列表
  18. Android Pixel手机Notification小图标显示白方块问题
  19. shiro安全框架扩展教程--如何扩展realm桥接器并退出自动清空角色资源缓存
  20. GDUFS 2018信息学院程序设计新手赛(正式赛)题解

热门文章

  1. rsync远程同步(增量备份)
  2. windows服务器局域网内与某台服务器时间同步
  3. 2015第七届HCTF
  4. 【影像常识】CT的“层“与“排”的区别
  5. 谨以此文纪念逝去的SUN
  6. 陈雨强:GPT等大模型应用落地需关注内容可信、数据安全、成本可控
  7. 《MATLAB智能算法30个案例》:第22章 蚁群算法的优化计算——旅行商问题(TSP)优化
  8. 第01章 Web应用概述
  9. 依据word模板批量生成试卷
  10. SAP中供应商采购冻结和冻结功能(质量原因)的应用区别?