mysql replicator_聊聊rocketmq-mysql的Replicator
序
本文主要研究一下rocketmq-mysql的Replicator
Replicator
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
public class Replicator {
private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
private Config config;
private EventProcessor eventProcessor;
private RocketMQProducer rocketMQProducer;
private Object lock = new Object();
private BinlogPosition nextBinlogPosition;
private long nextQueueOffset;
private long xid;
public static void main(String[] args) {
Replicator replicator = new Replicator();
replicator.start();
}
public void start() {
try {
config = new Config();
config.load();
rocketMQProducer = new RocketMQProducer(config);
rocketMQProducer.start();
BinlogPositionLogThread binlogPositionLogThread = new BinlogPositionLogThread(this);
binlogPositionLogThread.start();
eventProcessor = new EventProcessor(this);
eventProcessor.start();
} catch (Exception e) {
LOGGER.error("Start error.", e);
System.exit(1);
}
}
public void commit(Transaction transaction, boolean isComplete) {
String json = transaction.toJson();
for (int i = 0; i < 3; i++) {
try {
if (isComplete) {
long offset = rocketMQProducer.push(json);
synchronized (lock) {
xid = transaction.getXid();
nextBinlogPosition = transaction.getNextBinlogPosition();
nextQueueOffset = offset;
}
} else {
rocketMQProducer.push(json);
}
break;
} catch (Exception e) {
LOGGER.error("Push error,retry:" + (i + 1) + ",", e);
}
}
}
public void logPosition() {
String binlogFilename = null;
long xid = 0L;
long nextPosition = 0L;
long nextOffset = 0L;
synchronized (lock) {
if (nextBinlogPosition != null) {
xid = this.xid;
binlogFilename = nextBinlogPosition.getBinlogFilename();
nextPosition = nextBinlogPosition.getPosition();
nextOffset = nextQueueOffset;
}
}
if (binlogFilename != null) {
POSITION_LOGGER.info("XID: {}, BINLOG_FILE: {}, NEXT_POSITION: {}, NEXT_OFFSET: {}",
xid, binlogFilename, nextPosition, nextOffset);
}
}
public Config getConfig() {
return config;
}
public BinlogPosition getNextBinlogPosition() {
return nextBinlogPosition;
}
}
Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset
RocketMQProducer
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
public class RocketMQProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);
private DefaultMQProducer producer;
private Config config;
public RocketMQProducer(Config config) {
this.config = config;
}
public void start() throws MQClientException {
producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP");
producer.setNamesrvAddr(config.mqNamesrvAddr);
producer.start();
}
public long push(String json) throws Exception {
LOGGER.debug(json);
Message message = new Message(config.mqTopic, json.getBytes("UTF-8"));
SendResult sendResult = producer.send(message);
return sendResult.getQueueOffset();
}
}
RocketMQProducer的start方法创建DefaultMQProducer并执行其start方法;其push方法则通过producer.send(message)发送消息,并返回sendResult.getQueueOffset()
BinlogPositionLogThread
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
public class BinlogPositionLogThread extends Thread {
private Logger logger = LoggerFactory.getLogger(BinlogPositionLogThread.class);
private Replicator replicator;
public BinlogPositionLogThread(Replicator replicator) {
this.replicator = replicator;
setDaemon(true);
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error("Offset thread interrupted.", e);
}
replicator.logPosition();
}
}
}
BinlogPositionLogThread会定时执行replicator.logPosition()来打印position信息
小结
Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset
doc
Replicator
关于找一找教程网
本站文章仅代表作者观点,不代表本站立场,所有文章非营利性免费分享。
本站提供了软件编程、网站开发技术、服务器运维、人工智能等等IT技术文章,希望广大程序员努力学习,让我们用科技改变世界。
[聊聊rocketmq-mysql的Replicator]http://www.zyiz.net/tech/detail-137260.html
mysql replicator_聊聊rocketmq-mysql的Replicator相关推荐
- canal同步mysql数据到rocketmq集群
rockermq多主多从异步复制部署参考 canal github 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更.从 2010 ...
- 电商秒杀 Springboot + Redis +RocketMq +Mysql
项目地址 https://gitee.com/noah2021/miaosha 转载,亲测可用! 测试用例 在下订单之前需要先发布对应的商品用于在Redis中生成口令避免大量请求导致服务器崩溃~~ 发 ...
- mysql 树排序_从MySQL开始聊聊“树”结构 (上)
前言 嗨喽,大家好,我是CrazyCodes, 近一年写的文章,都是一些广度方面的思考,新的一年,在技术深度上也需要有更多的探索,感谢各位的持续支持! MySQL 先聊聊大家熟知的MySQL,我们使用 ...
- mysql安装注意步骤,mysql安装步骤
mysql安装步骤 1.在官网下载对应的压缩文件,放到本地文件夹下,解压缩. 2.配置Path环境变量:新增mysql的bin文件夹路径,C:\software\mysql-8.0.23-winx64 ...
- liunx上mysql源码安装mysql,搞定linux上MySQL编程(一):linux上源码安装MySQL
[版权声明:尊重原创,转载请保留出处:blog.csdn.net/shallnet,文章仅供学习交流,请勿用于商业用途] 1. 首先下载源码包: ftp://ftp.jaist.ac.jp/pub/m ...
- is this mysql server_远程连接MySQL数据库报错:is not allowed to connect to this MYSQL server的解决办法...
1. 改表法. 可能是你的帐号不允许从远程登陆,只能在localhost.这个时候只要在localhost的那台电脑,登入MySQL后,更改 "mysql" 数据库里的 " ...
- linux 修改mysql root密码_Linux mysql如何更改root密码
说到root密码,很多人想到的是电脑系统的root账号密码,其实mysql也有root密码,那么在Linux系统中,mysql要如何修改root密码呢?特别是忘记了root密码要怎么办? 通过登录my ...
- tcmalloc mysql 缓存_Tcmalloc优化Mysql内存管理
实验环境: OS:Redhat 5.3 64bit Mysql:mysql 5.5.29 TCMalloc(Thread-Caching Malloc)与标准glibc库的malloc实现一样的功能, ...
- PHP mysql数据迁移,【MySQL】迁移数据目录php-php教程
move dir # mv /var/libmysql /mnt/data/ vi /etc/my.cnf [mysqld] // 服务器端 datadir=/mnt/data/mysql socke ...
最新文章
- Potocol Buffer详解
- python是用什么语言开发的-python是什么语言?哪些人适合学习Python?
- MyBatisPlus介绍入门以及项目集成MyBatisPlus
- ubuntu 配置url地址重定向协议
- .NET静态类的概念
- linux内存管理总结
- 用VMware GSX和W2K群集服务实现Exchange群集
- Dijkstral算法--单源最短路
- fireworks切图
- 为你的软件选择正确的许可证方案
- XJOI 3709 测测你的RP
- 遥感数据存储格式 ----BSQ、 BIL、BIP及相互转换
- 从互联网大厂跳槽到国企后,我发现没有一劳永逸的工作。。。
- 赵小楼《天道》《遥远的救世主》解读(71)客观逻辑与离相的渊源
- 尘埃落定!AI 大牛贾佳亚离开腾讯优图,创立思谋科技,投身差异化 AI 创业
- 【Python】基于机器学习的财务数据分析——识别财务造假
- Linux arping命令测试IP地址冲突
- java一竖,java 添加一个竖滚动条
- 联想超级计算机盈利,联想集团2019年营收 联想集团全年营收多少
- python int()函数详解
热门文章
- oracle 修改lsnrctl,lsnrctl oracle 监听器 命令行
- 给予Java初学者的学习路线建议
- 聊聊编程中的 “魔数”
- 飞腾FT2000/4 CPU UEFI开发 固件配置
- hankerrank 刷题二( Python 基础)
- 计算机网络 - (三)电脑如何获取到IP的
- 【电力电子技术DC-AC】三相SPWM逆变器Simulink仿真(设置死区时间)
- Repeater的 Items属性、Items里面的控件有几个?
- 查找代码文件中的非 ASCII 字符
- EasyExcel 实现批量合并单元格(支持自定义)