Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。

Open Replicator项目地址:https://github.com/whitesock/open-replicator

binlog事件分析结构图

在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。

这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:

DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):

{

"eventId": 1,

"databaseName": "canal_test",

"tableName": "`company`",

"eventType": 2,

"timestamp": 1477033198000,

"timestampReceipt": 1477033248780,

"binlogName": "mysql-bin.000006",

"position": 353,

"nextPostion": 468,

"serverId": 2,

"before": null,

"after": null,

"isDdl": true,

"sql": "DROP TABLE `company` /* generated by server */"}

DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):

{

"eventId": 0,

"databaseName": "canal_test",

"tableName": "person",

"eventType": 24,

"timestamp": 1477030734000,

"timestampReceipt": 1477032161988,

"binlogName": "mysql-bin.000006",

"position": 242,

"nextPostion": 326,

"serverId": 2,

"before": {

"id": "3",

"sex": "f",

"address": "shanghai",

"age": "23",

"name": "zzh3"},

"after": {

"id": "3",

"sex": "m",

"address": "shanghai",

"age": "23",

"name": "zzh3"},

"isDdl": false,

"sql": null}

相关的类文件如下:

CDCEvent.java

package or;

import java.util.Map;

import java.util.concurrent.atomic.AtomicLong;

import com.google.code.or.binlog.BinlogEventV4;

import com.google.code.or.binlog.BinlogEventV4Header;

import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;

public class CDCEvent {

private long eventId = 0;//事件唯一标识

private String databaseName = null;

private String tableName = null;

private int eventType = 0;//事件类型

private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]

private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]

private String binlogName = null;// binlog file name

private long position = 0;

private long nextPostion = 0;

private long serverId = 0;

private Map before = null;

private Map after = null;

private Boolean isDdl= null;

private String sql = null;

private static AtomicLong uuid = new AtomicLong(0);

public CDCEvent(){}

public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName){

this.init(are);

this.databaseName = databaseName;

this.tableName = tableName;

}

private void init(final BinlogEventV4 be){

this.eventId = uuid.getAndAdd(1);

BinlogEventV4Header header = be.getHeader();

this.timestamp = header.getTimestamp();

this.eventType = header.getEventType();

this.serverId = header.getServerId();

this.timestampReceipt = header.getTimestampOfReceipt();

this.position = header.getPosition();

this.nextPostion = header.getNextPosition();

this.binlogName = header.getBinlogFileName();

}

@Override

public String toString(){

StringBuilder builder = new StringBuilder();

builder.append("{ eventId:").append(eventId);

builder.append(",databaseName:").append(databaseName);

builder.append(",tableName:").append(tableName);

builder.append(",eventType:").append(eventType);

builder.append(",timestamp:").append(timestamp);

builder.append(",timestampReceipt:").append(timestampReceipt);

builder.append(",binlogName:").append(binlogName);

builder.append(",position:").append(position);

builder.append(",nextPostion:").append(nextPostion);

builder.append(",serverId:").append(serverId);

builder.append(",isDdl:").append(isDdl);

builder.append(",sql:").append(sql);

builder.append(",before:").append(before);

builder.append(",after:").append(after).append("}");

return builder.toString();

}

// 省略Getter和Setter方法

}

open-replicator的解析主要是通过注册Listener的形式实现的,整个过程最重要的步骤在下面:

InstanceListener.java

package or;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import or.keeper.TableInfoKeeper;

import or.manager.CDCEventManager;

import or.model.ColumnInfo;

import or.model.TableInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.BinlogEventListener;

import com.google.code.or.binlog.BinlogEventV4;

import com.google.code.or.binlog.impl.event.DeleteRowsEvent;

import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;

import com.google.code.or.binlog.impl.event.QueryEvent;

import com.google.code.or.binlog.impl.event.TableMapEvent;

import com.google.code.or.binlog.impl.event.UpdateRowsEvent;

import com.google.code.or.binlog.impl.event.WriteRowsEvent;

import com.google.code.or.binlog.impl.event.XidEvent;

import com.google.code.or.common.glossary.Column;

import com.google.code.or.common.glossary.Pair;

import com.google.code.or.common.glossary.Row;

import com.google.code.or.common.util.MySQLConstants;

public class InstanceListener implements BinlogEventListener{

private static final Logger logger = LoggerFactory.getLogger(InstanceListener.class);

@Override

public void onEvents(BinlogEventV4 be) {

if(be == null){

logger.error("binlog event is null");

return;

}

int eventType = be.getHeader().getEventType();

switch(eventType){

case MySQLConstants.FORMAT_DESCRIPTION_EVENT:

{

logger.trace("FORMAT_DESCRIPTION_EVENT");

break;

}

case MySQLConstants.TABLE_MAP_EVENT://每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId

{

TableMapEvent tme = (TableMapEvent)be;

TableInfoKeeper.saveTableIdMap(tme);

logger.trace("TABLE_MAP_EVENT:tableId:{}",tme.getTableId());

break;

}

case MySQLConstants.DELETE_ROWS_EVENT:

{

DeleteRowsEvent dre = (DeleteRowsEvent) be;

long tableId = dre.getTableId();

logger.trace("DELETE_ROW_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List rows = dre.getRows();

for(Row row:rows){

List before = row.getColumns();

Map beforeMap = getMap(before,databaseName,tableName);

if(beforeMap !=null && beforeMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(dre,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setBefore(beforeMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.UPDATE_ROWS_EVENT:

{

UpdateRowsEvent upe = (UpdateRowsEvent)be;

long tableId = upe.getTableId();

logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List> rows = upe.getRows();

for(Pair p:rows){

List colsBefore = p.getBefore().getColumns();

List colsAfter = p.getAfter().getColumns();

Map beforeMap = getMap(colsBefore,databaseName,tableName);

Map afterMap = getMap(colsAfter,databaseName,tableName);

if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(upe,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setBefore(beforeMap);

cdcEvent.setAfter(afterMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.WRITE_ROWS_EVENT:

{

WriteRowsEvent wre = (WriteRowsEvent)be;

long tableId = wre.getTableId();

logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List rows = wre.getRows();

for(Row row: rows){

List after = row.getColumns();

Map afterMap = getMap(after,databaseName,tableName);

if(afterMap!=null && afterMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(wre,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setAfter(afterMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.QUERY_EVENT:

{

QueryEvent qe = (QueryEvent)be;

TableInfo tableInfo = createTableInfo(qe);

if(tableInfo == null)

break;

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);

CDCEvent cdcEvent = new CDCEvent(qe,databaseName,tableName);

cdcEvent.setIsDdl(true);

cdcEvent.setSql(qe.getSql().toString());

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

break;

}

case MySQLConstants.XID_EVENT:{

XidEvent xe = (XidEvent)be;

logger.trace("XID_EVENT: xid:{}",xe.getXid());

break;

}

default:

{

logger.trace("DEFAULT:{}",eventType);

break;

}

}

}

/**

* ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,

* 然后跟取回的List进行映射。

*

* @param cols

* @param databaseName

* @param tableName

* @return

*/

private Map getMap(List cols, String databaseName, String tableName){

Map map = new HashMap<>();

if(cols == null || cols.size()==0){

return null;

}

String fullName = databaseName+"."+tableName;

List columnInfoList = TableInfoKeeper.getColumns(fullName);

if(columnInfoList == null)

return null;

if(columnInfoList.size() != cols.size()){

TableInfoKeeper.refreshColumnsMap();

if(columnInfoList.size() != cols.size())

{

logger.warn("columnInfoList.size is not equal to cols.");

return null;

}

}

for(int i=0;i

if(cols.get(i).getValue()==null)

map.put(columnInfoList.get(i).getName(),"");

else

map.put(columnInfoList.get(i).getName(), cols.get(i).toString());

}

return map;

}

/**

* 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,

* 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中

*

* @param qe

* @return

*/

private TableInfo createTableInfo(QueryEvent qe){

String sql = qe.getSql().toString().toLowerCase();

TableInfo ti = new TableInfo();

String databaseName = qe.getDatabaseName().toString();

String tableName = null;

if(checkFlag(sql,"table")){

tableName = getTableName(sql,"table");

} else if(checkFlag(sql,"truncate")){

tableName = getTableName(sql,"truncate");

} else{

logger.warn("can not find table name from sql:{}",sql);

return null;

}

ti.setDatabaseName(databaseName);

ti.setTableName(tableName);

ti.setFullName(databaseName+"."+tableName);

return ti;

}

private boolean checkFlag(String sql, String flag){

String[] ss = sql.split(" ");

for(String s:ss){

if(s.equals(flag)){

return true;

}

}

return false;

}

private String getTableName(String sql, String flag){

String[] ss = sql.split("\\.");

String tName = null;

if (ss.length > 1) {

String[] strs = ss[1].split(" ");

tName = strs[0];

} else {

String[] strs = sql.split(" ");

boolean start = false;

for (String s : strs) {

if (s.indexOf(flag) >= 0) {

start = true;

continue;

}

if (start && !s.isEmpty()) {

tName = s;

break;

}

}

}

tName.replaceAll("`", "").replaceAll(";", "");

//del "("[create table person(....]

int index = tName.indexOf('(');

if(index>0){

tName = tName.substring(0, index);

}

return tName;

}

}

上面所涉及到的TableInfo .java如下:

package or.model;

public class TableInfo {

private String databaseName;

private String tableName;

private String fullName;

// 省略Getter和Setter

@Override

public boolean equals(Object o){

if(this == o)

return true;

if(o == null || this.getClass()!=o.getClass())

return false;

TableInfo tableInfo = (TableInfo)o;

if(!this.databaseName.equals(tableInfo.getDatabaseName()))

return false;

if(!this.tableName.equals(tableInfo.getTableName()))

return false;

if(!this.fullName.equals(tableInfo.getFullName()))

return false;

return true;

}

@Override

public int hashCode(){

int result = this.tableName.hashCode();

result = 31*result+this.databaseName.hashCode();

result = 31*result+this.fullName.hashCode();

return result;

}

}

接着需要有个地方保存从TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java

package or.keeper;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import or.MysqlConnection;

import or.model.ColumnInfo;

import or.model.TableInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.impl.event.TableMapEvent;

public class TableInfoKeeper {

private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);

private static Map tabledIdMap = new ConcurrentHashMap<>();

private static Map> columnsMap = new ConcurrentHashMap<>();

static{

columnsMap = MysqlConnection.getColumns();

}

public static void saveTableIdMap(TableMapEvent tme){

long tableId = tme.getTableId();

tabledIdMap.remove(tableId);

TableInfo table = new TableInfo();

table.setDatabaseName(tme.getDatabaseName().toString());

table.setTableName(tme.getTableName().toString());

table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());

tabledIdMap.put(tableId, table);

}

public static synchronized void refreshColumnsMap(){

Map> map = MysqlConnection.getColumns();

if(map.size()>0){

// logger.warn("refresh and clear cols.");

columnsMap = map;

// logger.warn("refresh and switch cols:{}",map);

}

else

{

logger.error("refresh columnsMap error.");

}

}

public static TableInfo getTableInfo(long tableId){

return tabledIdMap.get(tableId);

}

public static List getColumns(String fullName){

return columnsMap.get(fullName);

}

}

正如上面InstanceListener中提到的,有些信息需要直接从MySQL中读取,比如数据库表的列信息,相关的类MysqlConnection如下:

package or;

import java.sql.Connection;

import java.sql.DatabaseMetaData;

import java.sql.DriverManager;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import or.model.BinlogInfo;

import or.model.BinlogMasterStatus;

import or.model.ColumnInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class MysqlConnection {

private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);

private static Connection conn;

private static String host;

private static int port;

private static String user;

private static String password;

public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg){

try {

if(conn == null || conn.isClosed()){

Class.forName("com.mysql.jdbc.Driver");

host = hostArg;

port = portArg;

user = userArg;

password = passwordArg;

conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);

logger.info("connected to mysql:{} : {}",user,password);

}

} catch (ClassNotFoundException e) {

logger.error(e.getMessage(),e);

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

public static Connection getConnection(){

try {

if(conn == null || conn.isClosed()){

setConnection(host,port,user,password);

}

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

return conn;

}

/**

* 获取Column信息

*

*@return

*/

public static Map> getColumns(){

Map> cols = new HashMap<>();

Connection conn = getConnection();

try {

DatabaseMetaData metaData = conn.getMetaData();

ResultSet r = metaData.getCatalogs();

String tableType[] = {"TABLE"};

while(r.next()){

String databaseName = r.getString("TABLE_CAT");

ResultSet result = metaData.getTables(databaseName, null, null, tableType);

while(result.next()){

String tableName = result.getString("TABLE_NAME");

// System.out.println(result.getInt("TABLE_ID"));

String key = databaseName +"."+tableName;

ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);

cols.put(key, new ArrayList());

while(colSet.next()){

ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));

cols.get(key).add(columnInfo);

}

}

}

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

return cols;

}

/**

* 参考

* mysql> show binary logs

* +------------------+-----------+

* | Log_name | File_size |

* +------------------+-----------+

* | mysql-bin.000001 | 126 |

* | mysql-bin.000002 | 126 |

* | mysql-bin.000003 | 6819 |

* | mysql-bin.000004 | 1868 |

* +------------------+-----------+

*/

public static List getBinlogInfo(){

List binlogList = new ArrayList<>();

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show binary logs");

while(resultSet.next()){

BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));

binlogList.add(binlogInfo);

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return binlogList;

}

/**

* 参考:

* mysql> show master status;

* +------------------+----------+--------------+------------------+

* | File | Position | Binlog_Do_DB | Binlog_Ignore_DB |

* +------------------+----------+--------------+------------------+

* | mysql-bin.000004 | 1868 | | |

* +------------------+----------+--------------+------------------+

*@return

*/

public static BinlogMasterStatus getBinlogMasterStatus(){

BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show master status");

while(resultSet.next()){

binlogMasterStatus.setBinlogName(resultSet.getString("File"));

binlogMasterStatus.setPosition(resultSet.getLong("Position"));

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return binlogMasterStatus;

}

/**

* 获取open-replicator所连接的mysql服务器的serverid信息

*@return

*/

public static int getServerId(){

int serverId=6789;

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show variables like 'server_id'");

while(resultSet.next()){

serverId = resultSet.getInt("Value");

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return serverId;

}

}

上面代码设计的附加类(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)

package or.model;

public class BinlogInfo {

private String binlogName;

private Long fileSize;

// 省略Getter和Setter

}

package or.model;

public class BinlogMasterStatus {

private String binlogName;

private long position;

// 省略Getter和Setter

}

package or.model;

public class ColumnInfo {

private String name;

private String type;

// 省略Getter和Setter

}

最后还要有个地方存储解析之后的事件信息,这里简要设计下,采用一个ConcurrentLinkedDeque好了(CDCEventManager.java)

package or.manager;

import java.util.concurrent.ConcurrentLinkedDeque;

import or.CDCEvent;

public class CDCEventManager {

public static final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>();

}

所有的准备工作都完成了,下面可以解析binlog日志了:

package or.test;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import or.CDCEvent;

import or.InstanceListener;

import or.MysqlConnection;

import or.OpenReplicatorPlus;

import or.manager.CDCEventManager;

import or.model.BinlogMasterStatus;

import com.google.code.or.OpenReplicator;

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import com.google.gson.JsonElement;

import com.google.gson.JsonParser;

public class OpenReplicatorTest {

private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);

private static final String host = "10.198.197.60";

private static final int port = 3306;

private static final String user = "****";

private static final String password = "****";

public static void main(String[] args){

OpenReplicator or = new OpenReplicator ();

or.setUser(user);

or.setPassword(password);

or.setHost(host);

or.setPort(port);

MysqlConnection.setConnection(host, port, user, password);

// or.setServerId(MysqlConnection.getServerId());

//配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId

BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();

or.setBinlogFileName(bms.getBinlogName());

// or.setBinlogFileName("mysql-bin.000004");

or.setBinlogPosition(4);

or.setBinlogEventListener(new InstanceListener());

try {

or.start();

} catch (Exception e) {

logger.error(e.getMessage(),e);

}

Thread thread = new Thread(new PrintCDCEvent());

thread.start();

}

public static class PrintCDCEvent implements Runnable{

@Override

public void run() {

while(true){

if(CDCEventManager.queue.isEmpty() == false)

{

CDCEvent ce = CDCEventManager.queue.pollFirst();

Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();

String prettyStr1 = gson.toJson(ce);

System.out.println(prettyStr1);

}

else{

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

}

}

时间运行旧了会遇到这样一个问题:

16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog

java.io.EOFException: null

at com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]

at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]

16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from 10.198.197.60:3306

初步解决方案(extends OpenReplicator然后添加重试机制):

package or;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.OpenReplicator;

public class OpenReplicatorPlus extends OpenReplicator{

private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);

private volatile boolean autoRestart = true;

@Override

public void stopQuietly(long timeout, TimeUnit unit){

super.stopQuietly(timeout, unit);

if(autoRestart){

try {

TimeUnit.SECONDS.sleep(10);

logger.error("Restart OpenReplicator");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

最后只需将OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改为OpenReplicator or = new OpenReplicatorPlus ();即可。

大功告成~~

opentrace在mysql中使用_采用OpenReplicator解析MySQL binlog相关推荐

  1. canal解析mysql日志异常_利用Canal解析mysql binlog日志

    一.安装包下载(canal.deployer-x.x.x.tar.gz  官方建议使用1.0.22版本) 二.解压文件 tar -zxvf canal.deployer-1.0.22.tar.gz - ...

  2. MySql中的日期处理(适用于 MySQL 5.X 及以上版本)

    MySql中的日期处理(适用于 MySQL 5.X 及以上版本) 一.日期相关函数 date_format(date, format)MySQL日期格式化unix_timestamp()str_to_ ...

  3. mysql中的nan_使用python-cod将MySql列中的“NAN”值转换为NULL

    我通过python在MySql中编写/存储数据.如果MySql中的列数据包含"NAN",那么我如何处理它们.目前我知道如何处理空白或无值,但在这里我被卡住了.数据看起来像这样# f ...

  4. python mysql写入速度加快_解决python写入mysql中datetime类型遇到的问题

    解决python写入mysql中datetime类型遇到的问题 发布时间:2020-08-31 16:46:47 来源:脚本之家 阅读:89 作者:WilliamDescant 刚开始使用python ...

  5. mysql中约束_【MySQL】:MySQL中四大约束

    所有的关系型数据库都支持对数据表使用约束,在表上强制执行数据校验,保证数据的完整性. MySQL数据库支持以下四种约束形式: 非空约束 NOT NULL 所有数值类型的值都可以为null. 空字符串和 ...

  6. 如何查看mysql默认字符集_如何找出MySQL中的默认服务器字符集?

    您可以使用系统变量character_set_server来了解MySQL中的默认服务器字符集.以下是语法-SHOW VARIABLES LIKE 'character_set_server'; 另外 ...

  7. 从MySQL中导出表中数据_用命令从mysql中导出/导入表结构及数据

    在命令行下mysql的数据导出有个很好用命令mysqldump,它的参数有一大把,可以这样查看: mysqldump 最常用的: mysqldump -uroot -pmysql databasefo ...

  8. nvarchar在mysql中是_如何在MySQL中创建NVARCHAR列?

    MySQL转换NVARCHAR()为VARCHAR().NVARCHAR在MySQL中代表National Varchar.让我们首先创建一个表,其中" StudentName"列 ...

  9. mysql中b树索引_Mongo和Mysql中的B树索引

    为什么Mysql中Innodb的索引结构采取B+树? 回答这个问题时,给自己留一条后路,不要把B树喷的一文不值.因为网上有些答案是说,B树不适合做文件存储系统的索引结构.如果按照那种答法,自己就给自己 ...

最新文章

  1. html5的在线播放页面,整理5款html5网页播放器,总有一款适合你吧
  2. Spring MVC-06循序渐进之Converter和Formatter
  3. phpwind html5,PHPWIND 5.3 运行代码 功能实现代码
  4. Silverlight Image Source URI 加反斜杠引和不加的区别
  5. Write operations are not allowed in read-only mode 只读模式下(FlushMode.NEVER/MANUAL)写操作不允
  6. 区间DP--凸多边形三角剖分
  7. 5、Fiddler如何捕获HTTPS会话
  8. 解决ArcGIS安装之后出现的Windows installer configures问题
  9. angular post php 404,AngularJS POST失败:飞行前响应具有无效的HTTP状态代码404
  10. 禁止root远程登录及修改ssh默认端口号
  11. (libgdx学习)TextInputListener
  12. 吴恩达神经网络和深度学习-学习笔记-6-训练集、验证集和测试集 + 偏差bias和方差variance
  13. Go中的切片Slice
  14. 读书笔记——《思维的乐趣matrix67数学笔记》
  15. win10装kali linux双系统,win10安装kali组成双系统攻略
  16. LumaQQ安装过程详解(转)
  17. 黑马Java学科资料
  18. 腾讯云文档数据库MongoDB怎么样?腾讯云文档数据库MongoDB有什么优点?
  19. 平面设计和网页设计的区别是什么
  20. 关于IT程序员一些面试技巧

热门文章

  1. 初探奥尔良(Orleans)
  2. eShopOnContainers 知多少[7]:Basket microservice
  3. .netcore 整合 log4net
  4. Quartz.Net分布式任务管理平台
  5. 微软Build 2017首日主角AI 同时发布.NET Core 2.0 Preview 1
  6. [转]浅谈CMD和win powershell的区别
  7. 统计百度网盘文件个数 V3
  8. 【CC精品教程】任务三:CC刺像控点,提交空三,新建重建项目(三维格网、三维点云、DOM和DSM)
  9. 甘肃省事业单位公考招聘考试权威复习资料---GIS专业全真模拟题(一)
  10. Android插件化开发之动态加载技术简单易懂的介绍方式