一:在linux环境下安装部署好mysql

1 :开启binlog

sudo vi /etc/my.cnf (Mysql的配置文件)

2: mysql的binlog格式有3种,为了把binlog解析成json数据格式,要设置binlog的格式为row(binlog有三种格式:Statement、Row以及Mixed)

server-id=1

log-bin=/data/mysql/log/binlog(这一步开启binlog)

binlog_format=row

具体如下图:

3:重启msyql服务

sudo service mysqld restart

4:查看是否已经开启binlog

Mysql>show variables like '%log_bin%';

此时在/data/mysql/log/binlog目录下可以看到生成了相应的binlog监听日志文件,如图,binlog.000004文件,每次重启msyql服务,就会生成一个新的监听文件,这里日志文件名称跟步骤2中配置的log-bin=/data/mysql/log/binlog有关,如果log-bin=master配置这样,那么日志文件名称就是master.000004。

二:配置Maxwell相关的部署工作

1:下载Maxwell

官网:http://maxwells-daemon.io/

组件下载链接

2:上传maxwell-1.11.0.tar.gz

通过winscp等FTP工具把maxwell-1.11.0.tar.gz到/usr/local/maxwell/ 目录下

3:解压maxwell-1.11.0.tar.gz

tar -zxvf maxwell-1.11.0.tar.gz

4:创建maxwell使用的mysql用户并赋权

mysql> GRANT ALL on maxwell.* to'maxwell'@'%' identified by 'XXXXXX';

mysql> GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE on *.* to 'maxwell'@'%';

以上图片为官网参考

以我自己的为例:

GRANT ALL on *.* to 'user01'@'%' identified by 'test123';

把所有数据库的所有表授权给user01用户以密码test123登录

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'user01'@'%';

flush privileges;

(mysql 新设置用户或更改密码后需用flush privileges刷新MySQL的系统权限相关表,否则会出现拒绝访问,还有一种方法,就是重新启动mysql服务器,来使新设置生效。­)

4::开启maxwell命令行(注意,如果没有设置,maxwell默认是把监听的mysql的binlog日志发送到kafka的主题叫maxwell的topic上的)

bin/maxwell --user='maxwell' --password='xxxx' --host='数据库IP地址' --producer=kafka --kafka.bootstrap.servers=bi-master:9092

解释:数据库IP地址参数是安装mysql的那台主机,最后的kafka.bootstrap.servers是安装kafka集群的节点主机名(最好不要用IP地址)和端口号。

三:kafka相关配置

说明(以下我的kafka是安装在主机名叫bi-master,注意kafka里的配置文件端口号要和命令行里给的端口号一致)

1:首先启动zookeeper

$sbin/zkServer.sh start

2:开启kafka命令行

bin/kafka-server-start.shconfig/server.properties

3:在kafka中创建一个主题叫maxwell以便于接受数据

bin/kafka-topics.sh--create --zookeeper bi-master:2181 --replication-factor 1 --partitions 1 --topic maxwell

4:启动kafka消息生产者窗口

bin/kafka-console-producer.sh --broker-list bi-master:9092 --topic maxwell

5:启动kafka消息消费者窗口

bin/kafka-console-consumer.sh --zookeeper bi-master:2181 --topic maxwell --from-beginning

四:测试

1:在Mysql客户工具 中添加一个数据

测试的表结构,表名称:myTest

消费窗口接收到的消息:

{"database":"binlogTest","table":"myTest","type":"insert","ts":1515494531,"xid":7693,"commit":true,"data":{"name":"小梅","sex":"女","age":18,"address":"深圳市南山区海岸城"}}

2:修改一条数据:

消费窗口接收到的消息:

{"database":"binlogTest","table":"myTest","type":"update","ts":1515494707,"xid":7756,"commit":true,"data":{"name":"小梅","sex":"男","age":18,"address":"深圳市福田区"},"old":{"sex":"女","address":"深圳市南山区海岸城"}}

3:删除一条数据

消费窗口接收到的消息:

{"database":"binlogTest","table":"myTest","type":"delete","ts":1515494807,"xid":7799,"commit":true,"data":{"name":"小梅","sex":"男","age":18,"address":"深圳市福田区"}}

五:通过java程序去消费kafka消息数据

1:jar包添加

2:代码实现

第一种实现:

第二种实现:

java实时解析mysql日志,利用maxwell 组件实时监听Mysql的Binlog日志,并且把解析的json格式数据发送到kafka窗口供实时消费...相关推荐

  1. 监听mysql表内容变化 使用canal_2 监听mysql表内容变化,使用canal

    mysql本身是支持主从的(master slave),原理就是master产生的binlog日志记录了所有的增删改语句,将binlog发送到slave节点进行执行即可完成数据的同步. canal是阿 ...

  2. php监听网页日志,如何用php程序监听一个不断增长的日志文件

    首先这个日志文件写入不是很频繁,它每一行就是一条有效的日志.我想php程序来监听这个文件,每当被写入一行的时候,我的php就自动读入一行,做出分析然后做相应的处理.请问要如何实现呢? 回复内容: 首先 ...

  3. oracle中多层嵌套命名,一种多层嵌套的json格式数据的命名解析方法

    一种多层嵌套的json格式数据的命名解析方法 [专利摘要]一种n层嵌套的json格式数据的命名解析方法,包括以下步骤:首先,以n层嵌套的json格式数据为基础建立数据源:然后,以键值对为最小单元由外而 ...

  4. Canal监听mysql的binlog日志实现数据同步

    Canal监听mysql的binlog日志实现数据同步 1. canal概述 1.1 canal简介 1.2 技术选型 1.3 原理分析 1.3.1 MySQL主备复制原理 1.3.2 canal原理 ...

  5. Java监听mysql的binlog详解(mysql-binlog-connector)

    Java监听mysql的binlog详解(mysql-binlog-connector) 1. 需求概述 2. 技术选型 3. 方案设计 3.环境准备 3.1 查看是否开启binlog 3.2 mys ...

  6. 监听mysql表内容变化 使用canal,canal 监听同步指定数据库,所有表

    canal 监听同步指定数据库,所有表 canal 监听同步指定数据库,所有表 因为工作需求,需要用到数据库同步,又从网上找了一些发现都有些问题,所以自己弄好之后写一篇总结,及配置步骤吧 先将 MyS ...

  7. vue_组件_监听组件事件

    1.$emit 的使用 在组件中注册自定义事件 $emit(事件名, 参数)    //该参数会当作第一个参数传入绑定的函数中 下面用一个菜单栏例子来说明,如下图所示 组件 Vue.component ...

  8. app开发历程————Android程序解析服务器端的JSON格式数据,显示在界面上

    上一篇文章写的是服务器端利用Servlet 返回JSON字符串,本文主要是利用android客户端访问服务器端链接,解析JSON格式数据,放到相应的位置上. 首先,android程序的布局文件main ...

  9. 学习日志day45(2021-09-09)(1、有道翻译API使用json格式数据 2、JSONP 3、Ajax提交form表单 4、Web Uploader)

    学习内容:学习JavaWeb(Day45) 1.有道翻译API使用json格式数据 2.JSONP 3.Ajax提交form表单 4.Web Uploader 1.有道翻译API使用json格式数据 ...

  10. Ajax 发送json格式数据以及发送文件(FormData)和自带的序列化组件: serializers

    前后端传输数据的编码格式(contentType) get请求数据就是直接放在url?后面的 url?usernmae=junjie&password=123... 可以向后端发送post请求 ...

最新文章

  1. 开始使用博客了,改变从这里开始。
  2. Objective-c——UI基础开发第十二天(相册展示)
  3. java 如何计算数据库_java – 如何在我的数据库中计算这些变量?
  4. 缓存核心知识小抄,面试必备,赶紧收藏!
  5. sc delete:指定的服务已经标记为删除
  6. VTS工具测试指定的testcase函数(以VtsHalKeymasterV4_0TargetTest为例)
  7. SET_TABLE_FOR_FIRST_DISPLAY 参数 I_SAVE
  8. WordPress后台添加侧边栏菜单
  9. 数据段、代码段、堆栈段、BSS段
  10. 【Java基础】3、Java 位运算(移位、位与、或、异或、非)
  11. python-day8-循环补充
  12. linux jmeter 内存,怎么在Linux下改变JMeter内存
  13. 推荐一个卡巴斯基绿色版本
  14. sqlite developer注册码(转)
  15. 二维码----百度百科
  16. V-Rep机器人仿真软件模型导入部分
  17. 2021程序员的出路在哪里
  18. 引入icon.styl字体文件无法解析报错
  19. html输入日期算出星座,如何通过日期计算星座
  20. [BZOJ3162]独钓寒江雪

热门文章

  1. 机器学习精讲中7.1固定特征核中关于N维向量多项式特征映射的理解(二)
  2. VS2013环境下GSL数学库的使用说明(亲测)
  3. 最新!2020录取分数百强大学榜发布:清华文科第一,北大理科第一!
  4. BUPT复试专题—最小距离查询(2013)
  5. HDU POJ 1015 Jury Compromise(陪审团的人选,DP)
  6. [数字dp] hdu 3271 SNIBB
  7. 文本相似度算法——空间向量模型的余弦算法和TF-IDF
  8. 如何知道是哪个进程造成死锁?如何把这个进程杀掉?
  9. Ubuntu怎么从图形桌面切换到命令行界面
  10. JAVA“类”数组的创建与调用