0、题记

实际业务场景中,会遇到基础数据存在 Mysql 中,实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。

而mysql写入kafka的选型方案有:

方案一:logstash_output_kafka 插件。

方案二:kafka_connector。

方案三:debezium 插件。

方案四:flume。

方案五:其他类似方案。

其中:debezium和flume是基于 mysql binlog 实现的。

如果需要同步历史全量数据+实时更新数据,建议使用logstash。

1、logstash同步原理

常用的logstash的插件是:logstash_input_jdbc实现关系型 数据库 到Elasticsearch等的同步。

实际上, 核心logstash的同步原理的掌握 ,有助于大家理解类似的各种库之间的同步。

logstash 核心原理 :输入生成事件,过滤器修改它们,输出将它们发送到其他地方。

logstash核心三部分组成:input、filter、output。

input { }

filter { }

output { }

1.1 input输入

包含但远不限于:

jdbc:关系型数据库:mysql、 oracle 等。

file:从文件系统上的文件读取。

syslog:在已知端口514上侦听syslog消息。

redis:redis消息。beats:处理 Beats发送的事件。

kafka:kafka实时数据流。

1.2 filter过滤器

过滤器是Logstash管道中的中间处理设备。您可以将过滤器与条件组合,以便在事件满足特定条件时对其执行操作。

可以把它比作数据处理的 ETL 环节。

一些有用的过滤包括:

grok:解析并构造任意文本。 Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方式 。有了内置于Logstash的120种模式,您很可能会找到满足您需求的模式!

mutate:对事件字段执行常规转换。您可以重命名,删除,替换和修改事件中的字段。

drop:完全删除事件,例如调试事件。

clone:制作事件的副本,可能添加或删除字段。

geoip:添加有关IP地址的地理位置的信息。

1.3 output输出

输出是Logstash管道的最后阶段。一些常用的输出包括:

elasticsearch:将事件数据发送到Elasticsearch。

file:将事件数据写入磁盘上的文件。

kafka:将事件写入Kafka。

详细的filter demo参考:http://t.cn/EaAt4zP

2、同步Mysql到kafka配置参考

input {

jdbc {

jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base"

jdbc_user => "root"

jdbc_password => "xxxxxxx"

jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

#schedule => "* * * * *"

statement => "SELECT * from news_info WHERE id > :sql_last_value  order by id"

use_column_value => true

tracking_column => "id"               tracking_column_type => "numeric"

record_last_run => true

last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run"         }

}

filter {

ruby{

code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)"

}

ruby{

code => "event.set('publish_time_unix',event.get('publish_time').to_i*1000)"

}

mutate {

remove_field => [ "@version" ]

remove_field => [ "@timestamp" ]

remove_field => [ "gather_time" ]

remove_field => [ "publish_time" ]

}

}

output {

kafka {

bootstrap_servers => "192.168.1.13:9092"

codec => json_lines

topic_id => "mytopic"

}

file {

codec => json_lines

path => "/tmp/output_a.log"

}

}

以上内容不复杂,不做细讲。

注意:

Mysql借助logstash同步后,日期类型格式:“2019-04-20 13:55:53”已经被识别为日期格式。

code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)",

是将Mysql中的时间格式转化为时间戳格式。

3、坑总结

3.1 坑1字段大小写问题

from星友:使用logstash同步mysql数据的,因为在jdbc.conf里面没有添加 lowercase_column_names

=> "false"  这个属性,所以logstash默认把查询结果的列明改为了小写,同步进了es,所以就导致es里面看到的字段名称全是小写。

最后总结:es是支持大写字段名称的,问题出在logstash没用好,需要在同步配置中加上 lowercase_column_names => "false"  。记录下来希望可以帮到更多人。

3.2 同步到ES中的数据会不会重复?

想将关系数据库的数据同步至ES中,如果在集群的多台 服务器 上同时启动logstash。

解读:实际项目中就是没用随机id  使用指定id作为es的_id ,指定id可以是url的md5.这样相同数据就会走更新覆盖以前数据

3.3 相同配置logstash,升级6.3之后不能同步数据。

解读:高版本基于时间增量有优化。

tracking_column_type => "timestamp" 应该是需要指定标识为时间类型,默认为数字类型numeric

3.4 ETL字段统一在哪处理?

解读:可以logstash同步mysql的时候sql查询阶段处理,如: select a_value as avalue*** 。

或者filter阶段处理, mutate rename 处理。

mutate {

rename => ["shortHostname", "hostname" ]

}

或者kafka阶段借助kafka stream处理。

4、小结

相关配置和同步都不复杂,复杂点往往在于filter阶段的解析还有logstash性能问题。

需要结合实际业务场景做深入的研究和性能分析。

有问题,欢迎留言讨论。

推荐阅读:

3、 一张图理清楚关系型数据库与Elasticsearch同步 http://t.cn/EaAceD3

4、新的实现:http://t.cn/EaAt60O

5、mysql2mysql: http://t.cn/EaAtK7r 6、推荐开源实现: http://t.cn/EaAtjqN

加入星球,更短时间更快习得更多干货!

日志文件和mysql同步到kafka_logstash_output_kafka:Mysql 同步 Kafka 深入详解相关推荐

  1. MySQL 5.7主从复制从零开始设置及全面详解——实现多线程并行同步,解决主从复制延迟问题!

    MySQL 5.7主从复制从零开始设置及全面详解--实现多线程并行同步,解决主从复制延迟问题! 参考文章: (1)MySQL 5.7主从复制从零开始设置及全面详解--实现多线程并行同步,解决主从复制延 ...

  2. MySQL Installer 8.0.21安装教程图文详解 转载

    MySQL Installer 8.0.21安装教程图文详解 原地址 1. 缘由 刚好需要在新系统上重新安装MySQL,便写了一份的下载安装教程,供查阅,以防日后细节有所遗忘. 2. 版本说明 MyS ...

  3. mysql安装后目录介绍,MySQL安装后的目录结构及配置文件详解

    MySQL安装后的目录结构及配置文件详解 MySQL安装后的目录结构及配置文件详解 MySQL目录结构说明 MySQL 安装完成后,会在磁盘上生成一个目录,该目录就被称为 MySQL 的安装目录. M ...

  4. Mysql配置文件my.cnf配置及配置参数详解

    Mysql配置文件my.cnf 安装了mysql没有my.cnf文件的情况 1.可以把mysql的示例配置文件,如my-medium.cnf拷贝到/etc/my.cnf,再去修改/etc/my.cnf ...

  5. mysql 5.7的配置文件_MySQL5.7配置文件详解

    [client] port = 3635 socket = /usr/local/mysql/socket/mysql.sock [mysqld] user = mysql #定义MySQL启动用户 ...

  6. [Python从零到壹] 八.数据库之MySQL和Sqlite基础知识及操作万字详解

    欢迎大家来到"Python从零到壹",在这里我将分享约200篇Python系列文章,带大家一起去学习和玩耍,看看Python这个有趣的世界.所有文章都将结合案例.代码和作者的经验讲 ...

  7. MySQL安装详细教程(小白式安装详解)

    MySQL安装详细教程(小白式安装详解) 1.下载地址 1.1地址 https://dev.mysql.com/downloads/mysql/ 下载链接 1.2下载版本 2.安装配置 2.1路径(路 ...

  8. win mysql5.7 msi_win10 安装 mysql 5.7 msi版的教程图文详解

    我装msi格式的,主要是想看看装完的my.ini, 文件位置C:\ProgramData\MySQL\MySQL Server 5.7\my.ini, 注意:ProgramData是隐藏文件夹 mys ...

  9. php mysql修改命令_PHP编程:mysql alter table命令修改表结构实例详解

    <PHP编程:mysql alter table命令修改表结构实例详解>要点: 本文介绍了PHP编程:mysql alter table命令修改表结构实例详解,希望对您有用.如果有疑问,可 ...

  10. mysql int(3)与int(11)的区别详解

    这篇文章主要介绍了mysql int(3)与int(11)的区别详解的相关资料,需要的朋友可以参考下 mysql int(3)与int(11)的区别 总结,int(M) zerofill,加上zero ...

最新文章

  1. 5-1 Django的路由层(urlconf)
  2. linux轻量级进程LWP
  3. Ant Design Pro+Electron+electron-builder实现React应用脱离浏览器,桌面安装运行
  4. CCScene切换的所有特效(28种)以及设置屏幕横竖屏!
  5. python如何导入numpy简书_Webpack 之常用配置(一)
  6. 2012021401
  7. python自定义函数的关键字_Python3.x中自定义比较函数
  8. 《并行计算的编程模型》一3.5 远程内存访问:put和get
  9. php连接云数据库服务器,服务器使用PHP连接sqlserver数据库
  10. Android Studio中.9.png文件出错问题
  11. 安装Mercurial进行版本管理
  12. 西北工业大学计算机专业课考什么,西北工业大学西工大计算机考研经验
  13. 概率论-随机变量的数字特征思维导图
  14. html如何添加微信好友,如何加微信好友,微信加人小妙招
  15. eclipse中 错误: 找不到或无法加载主类 f.B
  16. dumple什么意思_dump是什么意思
  17. VScommunity2019 0x00007FFCCA14B7EC (ucrtbased.dll) (Project1.exe 中)处有未经处理的异常: 将一个无效参数传递给了将无效参数视为严重错误
  18. SAP PS 第15节 预算管理
  19. Java霸王的大陆梦幻版_三国志霸王的大陆,喜欢三国的进
  20. BigDecimal ROUND_HALF_DOWN精度问题

热门文章

  1. wrf 嵌套网格作用_在网格系统中使用响应列,嵌套列和偏移列 引导程序
  2. mac mail 删除邮件服务器,Mac邮件应用程序Mail设置
  3. bandizip最后一个无广告版本_如果非要选择一款压缩软件的话——Bandizip
  4. python定义变量名的时候、需要注意问题_python中将函数赋值给变量时需要注意的一些问题...
  5. android光照传感器,详解 android 光线传感器 light sensor的使用
  6. agp模式_AGP的完整形式是什么?
  7. java bitset_Java BitSet intersects()方法与示例
  8. Java ClassLoader findLibrary()方法与示例
  9. oracle中dbms_如何在DBMS中找到关系的最高范式?
  10. kotlin字符串数组_Kotlin程序读取,遍历,反向和排序字符串数组