https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

8.11.1. 安装 logstash

安装 JDBC 驱动 和 Logstash

curl -s https://raw.githubusercontent.com/oscm/shell/master/database/mysql/5.7/mysql-connector-java.sh   | bash
curl -s https://raw.githubusercontent.com/oscm/shell/master/search/logstash/logstash-5.x.sh | bash

mysql 驱动文件位置在 /usr/share/java/mysql-connector-java.jar

8.11.2. 配置 logstash

创建配置文件 /etc/logstash/conf.d/jdbc-mysql.conf

mysql> desc article;
+-------------+--------------+------+-----+---------+-------+
| Field       | Type         | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+-------+
| id          | int(11)      | NO   |     | 0       |       |
| title       | mediumtext   | NO   |     | NULL    |       |
| description | mediumtext   | YES  |     | NULL    |       |
| author      | varchar(100) | YES  |     | NULL    |       |
| source      | varchar(100) | YES  |     | NULL    |       |
| ctime       | datetime     | NO   |     | NULL    |       |
| content     | longtext     | YES  |     | NULL    |       |
+-------------+--------------+------+-----+---------+-------+
7 rows in set (0.00 sec)
input {jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *"statement => "select * from article"}
}
output {elasticsearch {hosts => "localhost:9200"index => "information"document_type => "article"document_id => "%{id}"}
}

8.11.3. 启动 Logstash

root@netkiller /var/log/logstash % systemctl restart logstashroot@netkiller /var/log/logstash % systemctl status logstash
● logstash.service - logstashLoaded: loaded (/etc/systemd/system/logstash.service; enabled; vendor preset: disabled)Active: active (running) since Mon 2017-07-31 09:35:00 CST; 11s agoMain PID: 10434 (java)CGroup: /system.slice/logstash.service└─10434 /usr/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC -Djava.awt.headless=true -Dfi...Jul 31 09:35:00 netkiller systemd[1]: Started logstash.
Jul 31 09:35:00 netkiller systemd[1]: Starting logstash...root@netkiller /var/log/logstash % cat logstash-plain.log
[2017-07-31T09:35:28,169][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-07-31T09:35:28,172][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-07-31T09:35:28,298][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<Java::JavaNet::URI:0x453a18e9>}
[2017-07-31T09:35:28,299][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-07-31T09:35:28,337][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-07-31T09:35:28,344][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2017-07-31T09:35:28,465][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<Java::JavaNet::URI:0x66df34ae>]}
[2017-07-31T09:35:28,483][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-07-31T09:35:29,562][INFO ][logstash.pipeline        ] Pipeline main started
[2017-07-31T09:35:29,700][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-07-31T09:36:01,019][INFO ][logstash.inputs.jdbc     ] (0.006000s) select * from article   

8.11.4. 验证

% curl -XGET 'http://localhost:9200/_all/_search?pretty'

8.11.5. 配置模板

8.11.5.1. 全量导入

适合数据没有改变的归档数据或者只能增加没有修改的数据

input {jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *"statement => "select * from article"}
}
output {elasticsearch {hosts => "localhost:9200"index => "information"document_type => "article"document_id => "%{id}"}
}

8.11.5.2. 多表导入

多张数据表导入到 Elasticsearch

# multiple inputs on logstash jdbcinput {jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *"statement => "select * from article"type => "article"}jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *"statement => "select * from comment"type => "comment"}
}
output {elasticsearch {hosts => "localhost:9200"index => "information"document_type => "%{type}"document_id => "%{id}"}
}               

需要在每一个jdbc配置项中加入 type 配置,然后 elasticsearch 配置项中加入 document_type => "%{type}"

8.11.5.3. 通过 ID 主键字段增量复制数据

input {jdbc {statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"use_column_value => truetracking_column => "id"tracking_column_type => "numeric"# ... other configuration bits}
}

tracking_column_type => "numeric" 可以声明 id 字段的数据类型, 如果不指定将会默认为日期

[2017-07-31T11:08:00,193][INFO ][logstash.inputs.jdbc     ] (0.020000s) select * from article where id > '2017-07-31 02:47:00'

如果复制不对称可以加入 clean_run => true 配置项,清楚数据

8.11.5.4. 通过日期字段增量复制数据

input {jdbc {statement => "SELECT * FROM my_table WHERE create_date > :sql_last_value"use_column_value => truetracking_column => "create_date"# ... other configuration bits}
}

如果复制不对称可以加入 clean_run => true 配置项,清楚数据

8.11.5.5. 指定SQL文件

statement_filepath 指定 SQL 文件,有时SQL太复杂写入 statement 配置项维护部方便,可以将 SQL 写入一个文本文件,然后使用 statement_filepath 配置项引用该文件。

input {jdbc {jdbc_driver_library => "/path/to/driver.jar"jdbc_driver_class => "org.postgresql.Driver"jdbc_url => "jdbc://postgresql"jdbc_user => "neo"jdbc_password => "password"statement_filepath => "query.sql"}
}               

8.11.5.6. 参数传递

将需要复制的条件参数写入 parameters 配置项

input {jdbc {jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"jdbc_user => "mysql"parameters => { "favorite_artist" => "Beethoven" }schedule => "* * * * *"statement => "SELECT * from songs where artist = :favorite_artist"}
}               

8.11.5.7. 控制返回JDBC数据量

 jdbc_fetch_size => 1000  #jdbc获取数据的数量大小jdbc_page_size => 1000 #jdbc一页的大小,jdbc_paging_enabled => true  #和jdbc_page_size组合,将statement的查询分解成多个查询,相当于: SELECT * FROM table LIMIT 1000 OFFSET 4000                 

8.11.5.8. 输出到不同的 Elasticsearch 中

通过 if [type]=="news" 执行不同的区块,实现将不同的type输出到指定的 index 中。

output {if [type]=="news" {elasticsearch {hosts => "node1.netkiller.cn:9200"index => "information"document_id => "%{id}"}}if [type]=="comment" {elasticsearch {hosts => "node2.netkiller.cn:9200"index => "information"document_id => "%{id}"}}
}       

8.11.5.9. 日期格式转换

日期格式化, 将ISO 8601日期格式转换为 %Y-%m-%d %H:%M:%S

input {jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/cms"jdbc_user => "cms"jdbc_password => "123456"schedule => "* * * * *"statement => "select * from article limit 5"}}
filter {ruby {init => "require 'time'"code => "event.set('ctime', event.get('ctime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"}ruby {init => "require 'time'"code => "event.set('mtime', event.get('mtime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"}
}
output {stdout {codec => rubydebug}}                

8.11.5.10. example

下面的例子实现了新数据复制,旧数据更新

input {jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *"   #定时cron的表达式,这里是每分钟执行一次statement => "select id, title, description, author, source, ctime, content from article where id > :sql_last_value"use_column_value => truetracking_column => "id"tracking_column_type => "numeric" record_last_run => truelast_run_metadata_path => "/var/tmp/article.last"}jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *"   #定时cron的表达式,这里是每分钟执行一次statement => "select * from article where ctime > :sql_last_value"use_column_value => truetracking_column => "ctime"tracking_column_type => "timestamp" record_last_run => truelast_run_metadata_path => "/var/tmp/article-ctime.last"}
}
output {elasticsearch {hosts => "localhost:9200"index => "information"document_type => "article"document_id => "%{id}"action => "update"  # 操作执行的动作,可选值有["index", "delete", "create", "update"]doc_as_upsert => true  #支持update模式}
}

8.11.6. 解决数据不对称问题

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field       | Type         | Null | Key | Default                        | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id          | int(11)      | NO   |     | 0                              |       |
| title       | mediumtext   | NO   |     | NULL                           |       |
| description | mediumtext   | YES  |     | NULL                           |       |
| author      | varchar(100) | YES  |     | NULL                           |       |
| source      | varchar(100) | YES  |     | NULL                           |       |
| content     | longtext     | YES  |     | NULL                           |       |
| status      | enum('Y','N')| NO   |     | 'N'                            |       |
| ctime       | timestamp    | NO   |     | CURRENT_TIMESTAMP              |       |
| mtime       | timestamp    | YES  |     | ON UPDATE CURRENT_TIMESTAMP    |       |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)

logstash 增加 mtime 的查询规则

         jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *"  #定时cron的表达式,这里是每分钟执行一次statement => "select * from article where mtime > :sql_last_value"use_column_value => truetracking_column => "mtime"tracking_column_type => "timestamp" record_last_run => truelast_run_metadata_path => "/var/tmp/article-mtime.last"}

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

CREATE TABLE `elasticsearch_trash` (`id` int(11) NOT NULL,`ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

为 article 表创建触发器

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN-- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。IF NEW.status = 'N' THENinsert into elasticsearch_trash(id) values(OLD.id);END IF;-- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。IF NEW.status = 'Y' THENdelete from elasticsearch_trash where id = OLD.id;END IF;
ENDCREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。insert into elasticsearch_trash(id) values(OLD.id);
END

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

8.11.7. 修改 Mapping

<paraf>需求 Elasticsearch 时间格式 从ISO 8601 到 yyyy-MM-dd HH:mm:ss。首先停止 logstash</paraf>

systemctl stop logstashrm -rf /var/tmp/article*             

修改 /etc/logstash/conf.d/jdbc.conf 配置文件

input {jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "123456"schedule => "* * * * *"statement => "select * from article where id > :sql_last_value"use_column_value => truetracking_column => "id"tracking_column_type => "numeric" record_last_run => truelast_run_metadata_path => "/var/tmp/article.last"}
jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "123456"schedule => "* * * * *"    #定时cron的表达式,这里是每分钟执行一次statement => "select * from article where ctime > :sql_last_value"use_column_value => truetracking_column => "ctime"tracking_column_type => "timestamp" record_last_run => truelast_run_metadata_path => "/var/tmp/article-ctime.last"}}filter {ruby {code => "event.set('ctime', event.get('[ctime]').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"}ruby {code => "event.set('mtime', event.get('[mtime]').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"}}output {elasticsearch {hosts => "localhost:9200"index => "information"document_type => "article"document_id => "%{id}"action => "update"doc_as_upsert => true}
}

删除就的index,重新创建,并配置 mapping。

          curl -XDELETE http://localhost:9200/informationcurl -XPUT http://localhost:9200/informationcurl -XPOST http://localhost:9200/information/article/_mapping -d'
{"properties": {"title": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"description": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"content": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"},"ctime": {"type":   "date","format": "yyyy-MM-dd HH:mm:ss"},"mtime": {"type":   "date","format": "yyyy-MM-dd HH:mm:ss"}}
}'curl "http://localhost:9200/information/article/_mapping?pretty"

启动 logstash 重新复制数据。

rm -rf /var/log/logstash/*
systemctl start logstash            

原文出处:Netkiller 系列 手札
本文作者:陈景峯
转载请与作者联系,同时请务必标明文章原始出处和作者信息及本声明。

8.11. Migrating MySQL Data into Elasticsearch using logstash相关推荐

  1. MySQL数据同步到ES集群(MySQL数据库与ElasticSearch全文检索的同步)

    简介:MySQL数据库与ElasticSearch全文检索的同步,通过binlog的设置对MySQL数据库操作的日志进行记录,利用Python模块对日志进行操作,再利用kafka的生产者消费者模式进行 ...

  2. 使用canal实时同步MySQL数据到Elasticsearch

    使用canal实时同步MySQL数据到Elasticsearch 搭建环境 安装 elasticsearch 安装 kibana 下载和安装canal 1.下载canal 2.配置MySQL 3.配置 ...

  3. ElasticSearch + Kibana + logstash+ik结合springboot代码实现,比较ES和传统Mysql查询效率

    开发环境:Win10 开发环境:STS 概要:此篇文章主要是传统的Mysql查询和ES查询两种方式的效率比较,以及代码实现,另外使用logstash进行mysql数据的同步也可以直接理解为" ...

  4. mysql.data.dll win10_关于Linux和Windows下部署mysql.data.dll的注册问题》

    为了学习ORM,选择了EntityFramework,经历了三天两夜的煎熬,N多次错误,在群里高手的帮助下,终于成功,现在将我的心路历程记录下来,一是让自己有个记录,另外就是让其它人少走些弯路. 我的 ...

  5. MySql.Data.dll官网下载

    Mysql.Data.dll官网下载 在项目开发中链接MySQL数据库经常要用到Mysql.Data.dll,网上虽然有很多,但是还是比较信赖官网的 今天就从官网下载一次记录一下过程 1.下载地址 官 ...

  6. mysql c 驱动dll_C#调用MySQL数据库(使用MySql.Data.dll连接)mysql-connector-net-6.10.4.msi

    下载地址:http://dev.mysql.com/downloads/connector/net/ 安装指导 1.安装:mysql-connector-net-6.10.4.msi 其下载地址:ht ...

  7. SuSE 11 安装mysql 5.6.35步骤

    1.创建安装用户组合安装用户. # groupadd mysql # useradd -g mysql mysql -d /usr/local/mysql 2.  将Redis安装包"mys ...

  8. 【Unity导入MySql.Data.dll报错】

    Unity导入MySql.Data.dll报错 错误的起因,想用Unity对MySQL操作.操作参考了勤诚勇毅的帖子 链接: https://blog.csdn.net/qq_43333567/art ...

  9. 未能加载 mysql.data_连接MySQL 提示错误”未能加载文件或程序集“MySql.Data, Version=5.1.4.0, Culture=neutral,……..” | 学步园...

    CodeSmith4.1.3版本连接MySQL 提示错误"未能加载文件或程序集"MySql.Data, Version=5.1.4.0, Culture=neutral,..... ...

最新文章

  1. 机器学习从Python 2迁移到Python 3,你需要注意的一些事……
  2. vue-auto-focus: 控制自动聚焦行为的 vue 指令
  3. 8天学通MongoDB——第二天 细说增删查改
  4. Chkdsk 工具将自动启动时启动的运行 Windows XP Service Pack 2 计算机扫描磁盘
  5. 如何利用OpenCV自带的级联分类器训练程序训练分类器
  6. springboot中使用ApplicationListener和ApplicationEvent /@EventListener监听事件
  7. Android Annotation注解详解
  8. P3327 [SDOI2015]约数个数和 (mobius反演)
  9. 开课吧Java课堂之如何使用FilenameFilter
  10. Linux头文件引用小技巧
  11. 字典攻击——彩虹表攻击与防御
  12. ubuntu安装锐捷客户端
  13. [疯狂Java讲义精粹] 第十三章|类加载机制与反射
  14. Linux(Ubuntu/Deepin/UOS)安装显卡驱动(附卸载)
  15. 一个简单的多机器人编队算法实现--PID
  16. 微信公众号自动回复设置教程
  17. 拉格朗日乘子法及拉格朗日中值定理
  18. mixin机制 vue_Vue 的 Mixin 设计是糟粕吗?
  19. 化工原理 --- 流体输送机械 --- 复习课
  20. 泼妇骂街被活色生香地搬到了网上

热门文章

  1. TensorRT Samples: MNIST
  2. Windows 7 64位机上OpenCV2.4.3的编译、安装与配置
  3. 【Qt】Qt再学习(十六):QObject::connect: Cannot queue arguments of type ‘QString‘
  4. ai 临摹图片换背景_AI临摹绘制插画图片
  5. 信号状态关_路由器要不要关?难怪信号越来越差
  6. Postman增删改查接口测试
  7. Java项目:网上选课系统(java+SSM+jsp+mysql+maven)
  8. 刀剑英雄登陆显示服务器繁忙,玩刀剑遇到问题解决方法
  9. linux中非法内存,Linux下数组非法访问导致内存破坏 —— 引发segmentation fault的原因...
  10. nodejs回调函数理解