第0章:简介

(1)下面是http://code.google.com中的binlog事件分析结构图:

(2)获取开源包的maven坐标

com.google.code

open-replicator

1.0.5

(3)参考网站

(4)札记

1)Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。

2)Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。

3)在程序结构上,最主要的设计原则是高性能,低内存占用。Open Replicator目前只支持MySQL5.0及以上版本。

第1章:实践

(1)包装Open Replicator类(AutoOpenReplicator.java)

package com.mcc.core.openReplicator;

import com.google.code.or.OpenReplicator;

import com.google.code.or.common.glossary.column.StringColumn;

import com.google.code.or.net.Packet;

import com.google.code.or.net.Transport;

import com.google.code.or.net.impl.packet.EOFPacket;

import com.google.code.or.net.impl.packet.ResultSetRowPacket;

import com.google.code.or.net.impl.packet.command.ComQuery;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.List;

import java.util.concurrent.TimeUnit;

/**

* MySQL binlog分析程序 ,用到open-replicator包

* 增加加自动配置binlog位置及重连机制

*

* @author menergy

*         DateTime: 13-12-26  下午2:22

*/

public class AutoOpenReplicator extends OpenReplicator {

// members

private static Logger logger = LoggerFactory.getLogger(AutoOpenReplicator.class);

private boolean autoReconnect = true;

// timeout auto reconnect , default 30 second

private int delayReconnect = 30;

// default timeout is 60 second, after timeout will be reconnect!

private int defaultTimeout = 120 * 1000;

// COM Query Transport

private Transport comQueryTransport;

// static block

// constructors

// properties

/**

* 是否自动重连

*

* @return 自动重连

*/

public boolean isAutoReconnect() {

return autoReconnect;

}

/**

* 设置自动重连

*

* @param autoReconnect 自动重连

*/

public void setAutoReconnect(boolean autoReconnect) {

this.autoReconnect = autoReconnect;

}

/**

* 断开多少秒后进行自动重连

*

* @param delayReconnect 断开后多少秒

*/

public void setDelayReconnect(int delayReconnect) {

this.delayReconnect = delayReconnect;

}

/**

* 断开多少秒后进行自动重连

*

* @return 断开后多少秒

*/

public int getDelayReconnect() {

return delayReconnect;

}

// public methods

// protected methods

@Override

public void start() {

do {

try {

long current = System.currentTimeMillis();

if (!this.isRunning()) {

if (this.getBinlogFileName() == null) updatePosition();

logger.info("Try to startup dump binlog from mysql master[{}, {}] ...", this.binlogFileName, this.binlogPosition);

this.reset();

super.start();

logger.info("Startup successed! After {} second if nothing event fire will be reconnect ...", defaultTimeout / 1000);

} else {

if (current - this.lastAlive >= this.defaultTimeout) {

this.stopQuietly(0, TimeUnit.SECONDS);

}

}

TimeUnit.SECONDS.sleep(this.getDelayReconnect());

} catch (Exception e) {

if (logger.isErrorEnabled()) {

logger.error("connect mysql failure!", e);

}

// reconnect failure, reget last binlog & position from master node and update cache!

//LoadCenter.loadAll(); // just update all cache, not flush!

updatePosition();

try {

TimeUnit.SECONDS.sleep(this.getDelayReconnect());

} catch (InterruptedException ignore) {

// NOP

}

}

} while (this.autoReconnect);

}

@Override

public void stopQuietly(long timeout, TimeUnit unit) {

super.stopQuietly(timeout, unit);

if (this.getBinlogParser() != null) {

// 重置, 当MySQL服务器进行restart/stop操作时进入该流程

this.binlogParser.setParserListeners(null); // 这句比较关键,不然会死循环

}

}

// friendly methods

// private methods

/**

* 自动配置binlog位置

*/

private void updatePosition() {

// 配置binlog位置

try {

ResultSetRowPacket binlogPacket = query("show master status");

if (binlogPacket != null) {

List values = binlogPacket.getColumns();

this.setBinlogFileName(values.get(0).toString());

this.setBinlogPosition(Long.valueOf(values.get(1).toString()));

}

} catch (Exception e) {

if (logger.isErrorEnabled()) {

logger.error("update binlog position failure!", e);

}

}

}

/**

* ComQuery 查询

*

* @param sql 查询语句

* @return

*/

private ResultSetRowPacket query(String sql) throws Exception {

ResultSetRowPacket row = null;

final ComQuery command = new ComQuery();

command.setSql(StringColumn.valueOf(sql.getBytes()));

if (this.comQueryTransport == null) this.comQueryTransport = getDefaultTransport();

this.comQueryTransport.connect(this.host, this.port);

this.comQueryTransport.getOutputStream().writePacket(command);

this.comQueryTransport.getOutputStream().flush();

// step 1

this.comQueryTransport.getInputStream().readPacket();

//

Packet packet;

// step 2

while (true) {

packet = comQueryTransport.getInputStream().readPacket();

if (packet.getPacketBody()[0] == EOFPacket.PACKET_MARKER) {

break;

}

}

// step 3

while (true) {

packet = comQueryTransport.getInputStream().readPacket();

if (packet.getPacketBody()[0] == EOFPacket.PACKET_MARKER) {

break;

} else {

row = ResultSetRowPacket.valueOf(packet);

}

}

this.comQueryTransport.disconnect();

return row;

}

private void reset() {

this.transport = null;

this.binlogParser = null;

}

// inner class

// test main

}

(2)事件监听器模板类(NotificationListener.java)

package com.mcc.core.openReplicator;

import com.google.code.or.binlog.BinlogEventListener;

import com.google.code.or.binlog.BinlogEventV4;

import com.google.code.or.binlog.impl.event.*;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

* Binlog事件监听器模板

*

* @author menergy

*         DateTime: 13-12-26  下午2:34

*/

public class NotificationListener implements BinlogEventListener {

private static Logger logger = LoggerFactory.getLogger(NotificationListener.class);

private String eventDatabase;

/**

* 这里只是实现例子,该方法可以自由处理逻辑

* @param event

*/

@Override

public void onEvents(BinlogEventV4 event) {

Class> eventType = event.getClass();

// 事务开始

if (eventType == QueryEvent.class) {

QueryEvent actualEvent = (QueryEvent) event;

this.eventDatabase = actualEvent.getDatabaseName().toString();

//TODO,这里可以获取事件数据库信息,可做其它逻辑处理

logger.info("事件数据库名:{}",eventDatabase);

return;

}

// 只监控指定数据库

if (eventDatabase != null && !"".equals(eventDatabase.trim())) {

if (eventType == TableMapEvent.class) {

TableMapEvent actualEvent = (TableMapEvent) event;

long tableId = actualEvent.getTableId();

String tableName = actualEvent.getTableName().toString();

//TODO,这里可以获取事件表信息,可做其它逻辑处理

logger.info("事件数据表ID:{}, 事件数据库表名称:{}",tableId, tableName);

} else if (eventType == WriteRowsEvent.class) { // 插入事件

WriteRowsEvent actualEvent = (WriteRowsEvent) event;

long tableId = actualEvent.getTableId();

//TODO,这里可以获取写行事件信息,可做其它逻辑处理

logger.info("写行事件ID:{}",tableId);

} else if (eventType == UpdateRowsEvent.class) { // 更新事件

UpdateRowsEvent actualEvent = (UpdateRowsEvent) event;

long tableId = actualEvent.getTableId();

//TODO,这里可以获取更新事件信息,可做其它逻辑处理

logger.info("更新事件ID:{}",tableId);

} else if (eventType == DeleteRowsEvent.class) {// 删除事件

DeleteRowsEvent actualEvent = (DeleteRowsEvent) event;

long tableId = actualEvent.getTableId();

//TODO,这里可以获取删除事件信息,可做其它逻辑处理

logger.info("删除事件ID:{}",tableId);

} else if (eventType == XidEvent.class) {// 结束事务

XidEvent actualEvent = (XidEvent) event;

long xId = actualEvent.getXid();

//TODO,这里可以获取结束事件信息,可做其它逻辑处理

logger.info("结束事件ID:{}",xId);

}

}

}

}

(3)MySQL binlog分析程序测试类(OpenReplicatorTest.java)

package com.mcc.core.test; import com.mcc.core.openReplicator.AutoOpenReplicator; import com.mcc.core.openReplicator.NotificationListener; /**  * MySQL binlog分析程序测试  *  * @author menergy  *         DateTime: 13-12-26  下午2:26  */ public class OpenReplicatorTest {     public static void main(String args[]){         // 配置从MySQL Master进行复制         final AutoOpenReplicator aor = new AutoOpenReplicator();         aor.setServerId(100001);         aor.setHost("192.168.1.1");         aor.setUser("admin");         aor.setPassword("123456");         aor.setAutoReconnect(true);         aor.setDelayReconnect(5);         aor.setBinlogEventListener(new NotificationListener());         aor.start();     } }

binlog流程 mysql_MySQL binlog分析程序:Open Replicator相关推荐

  1. LS-DYNA (动力分析程序)

    LS-DYNA 是世界上最着名的通用显式动力分析程序,能够模拟真实世界的各种复杂问题,特别适合求解各种二维.三维非线性结构的高速碰撞.爆炸和金属成型等非线性动力冲击问题,同时可以求解传热.流体及流固耦 ...

  2. python唐诗分析综合_全唐诗分析程序

    全唐诗分析程序 这个程序最初的诞生是为了写微信公众号的两篇文章,那两篇文章的也大致讲解了程序的原理和流程. 因此,在使用程序之前,强烈建议您先读这两篇文章: 相应的,程序也主要有两个方面的功能: 分析 ...

  3. 用dotTace模仿下老赵的“使用Profiler分析程序性能”

    最近看到老赵博客"使用Profiler分析程序性能"(http://www.cnblogs.com/JeffreyZhao/archive/2009/12/22/profiler- ...

  4. 【Android 插件化】Hook 插件化框架 ( Hook Activity 启动流程 | Hook 点分析 )

    Android 插件化系列文章目录 [Android 插件化]插件化简介 ( 组件化与插件化 ) [Android 插件化]插件化原理 ( JVM 内存数据 | 类加载流程 ) [Android 插件 ...

  5. 宝付分析程序员怎么提升自己

    宝付分析程序员怎么提升自己.相信我们做程序员的都听说"青春饭"这两个字,其实宝付认为程序员的职业生涯和年龄并没有太直接的原因,重要的还是个人的规划,今天宝付就给大家分享一下,程序员 ...

  6. 【ABAP】通过ST05分析程序执行路径

    在系统维护中,经常需要对用户自定义开发的程序以及系统的标准程序进行分析,需要知道程序执行中有哪些表被调用,执行了哪些操作.SAP提供了性能分析工具ST05,能够对程序执行中的操作进行跟踪.下面介绍如何 ...

  7. 用70行代码实现日志分析程序​

    python又一力作,感受python的强大.用70行代码实现日志分析程序 功能介绍:可直接对文本日至进行分组和排序功能,完了输出结果粘贴到excel里就可以直接生成图表,对于排查一些生产环境问题有很 ...

  8. 支付宝app支付java后台流程、原理分析(含nei wang chuan tou)

    java版支付宝app支付流程及原理分析 本实例是基于springmvc框架编写      一.流程步骤          1.执行流程            当手机端app(就是你公司开发的app) ...

  9. python解zuobiaoxi方程_滑坡稳定性分析程序初探---Python版!

    0 前言 山体滑坡是常见的自然灾害,从理论分析的角度讲,滑坡的稳定性分析方法源自于高中物理学,如图1所示.前者的滑动分析非常简单,在已知滑块的重量以及接触面摩擦系数的基础上通过计算下滑力和抗滑力的关系 ...

最新文章

  1. Android - HttpURLConnection 抛出异常
  2. 【JavaSE02】Java基本语法-练习
  3. Http响应码及其含义--摘自apache官网
  4. 【Linux】一步一步学Linux——whatis命令(14)
  5. Ansible Synchronize
  6. Substance PBR Guide
  7. 集合与数组,集合与集合之间的转换
  8. JavaScript表单验证示例
  9. JSP面试题都在这里 1
  10. 如果U盘中了文件夹隐藏病毒,怎么办?
  11. Xmapp安装配置和连接MySQL
  12. 【python】爬取元素周期表的元素信息
  13. 工业大数据竞赛的轴承数据集
  14. python的复数的实部虚部都是浮点数吗_python中复数的共轭复数知识点总结
  15. 低成本打造初创团队的 DevOps 实践(采用 NAS中的KVM 承载 Gogs + Jenkins + Nexus 服务)【0x02】安装Nexus
  16. 慧荣SM2246EN开卡Toggle 8贴东芝闪存SSD失败解决方法
  17. 谷歌移动端seo优化如何做
  18. 5G NR学习理解系列——MATLAB5G信源的生成之SSB参数配置
  19. Windows更新导致AMD Radeon Software等软件无法正常启动
  20. 美国商会呼吁对ICO进行澄清

热门文章

  1. lsnrctl command not found
  2. 25 条客户服务名言激励您的团队
  3. win10用OBS录制视频出现视频文件音画不同步的情况处理
  4. 【ABAP系列】SAP ABAP smartforms设备类型CNSAPWIN不支持页格式ZXXX
  5. 一牛网:最新6月手机综合性能评测
  6. android开发—什么是ADB?
  7. 数理统计复习:统计量及其分布(3)充分统计量
  8. 基于CNN实现垃圾分类案例
  9. 苹果终端date命令_mac终端常用命令
  10. 价格搜索上首页应该注意哪些?