本文主要研究一下rocketmq-mysql的Replicator

Replicator

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java

public class Replicator {

private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);

private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");

private Config config;

private EventProcessor eventProcessor;

private RocketMQProducer rocketMQProducer;

private Object lock = new Object();

private BinlogPosition nextBinlogPosition;

private long nextQueueOffset;

private long xid;

public static void main(String[] args) {

Replicator replicator = new Replicator();

replicator.start();

}

public void start() {

try {

config = new Config();

config.load();

rocketMQProducer = new RocketMQProducer(config);

rocketMQProducer.start();

BinlogPositionLogThread binlogPositionLogThread = new BinlogPositionLogThread(this);

binlogPositionLogThread.start();

eventProcessor = new EventProcessor(this);

eventProcessor.start();

} catch (Exception e) {

LOGGER.error("Start error.", e);

System.exit(1);

}

}

public void commit(Transaction transaction, boolean isComplete) {

String json = transaction.toJson();

for (int i = 0; i < 3; i++) {

try {

if (isComplete) {

long offset = rocketMQProducer.push(json);

synchronized (lock) {

xid = transaction.getXid();

nextBinlogPosition = transaction.getNextBinlogPosition();

nextQueueOffset = offset;

}

} else {

rocketMQProducer.push(json);

}

break;

} catch (Exception e) {

LOGGER.error("Push error,retry:" + (i + 1) + ",", e);

}

}

}

public void logPosition() {

String binlogFilename = null;

long xid = 0L;

long nextPosition = 0L;

long nextOffset = 0L;

synchronized (lock) {

if (nextBinlogPosition != null) {

xid = this.xid;

binlogFilename = nextBinlogPosition.getBinlogFilename();

nextPosition = nextBinlogPosition.getPosition();

nextOffset = nextQueueOffset;

}

}

if (binlogFilename != null) {

POSITION_LOGGER.info("XID: {}, BINLOG_FILE: {}, NEXT_POSITION: {}, NEXT_OFFSET: {}",

xid, binlogFilename, nextPosition, nextOffset);

}

}

public Config getConfig() {

return config;

}

public BinlogPosition getNextBinlogPosition() {

return nextBinlogPosition;

}

}

Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset

RocketMQProducer

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java

public class RocketMQProducer {

private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);

private DefaultMQProducer producer;

private Config config;

public RocketMQProducer(Config config) {

this.config = config;

}

public void start() throws MQClientException {

producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP");

producer.setNamesrvAddr(config.mqNamesrvAddr);

producer.start();

}

public long push(String json) throws Exception {

LOGGER.debug(json);

Message message = new Message(config.mqTopic, json.getBytes("UTF-8"));

SendResult sendResult = producer.send(message);

return sendResult.getQueueOffset();

}

}

RocketMQProducer的start方法创建DefaultMQProducer并执行其start方法;其push方法则通过producer.send(message)发送消息,并返回sendResult.getQueueOffset()

BinlogPositionLogThread

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java

public class BinlogPositionLogThread extends Thread {

private Logger logger = LoggerFactory.getLogger(BinlogPositionLogThread.class);

private Replicator replicator;

public BinlogPositionLogThread(Replicator replicator) {

this.replicator = replicator;

setDaemon(true);

}

@Override

public void run() {

while (true) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

logger.error("Offset thread interrupted.", e);

}

replicator.logPosition();

}

}

}

BinlogPositionLogThread会定时执行replicator.logPosition()来打印position信息

小结

Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset

doc

Replicator

关于找一找教程网

本站文章仅代表作者观点,不代表本站立场,所有文章非营利性免费分享。

本站提供了软件编程、网站开发技术、服务器运维、人工智能等等IT技术文章,希望广大程序员努力学习,让我们用科技改变世界。

[聊聊rocketmq-mysql的Replicator]http://www.zyiz.net/tech/detail-137260.html

mysql replicator_聊聊rocketmq-mysql的Replicator相关推荐

  1. canal同步mysql数据到rocketmq集群

    rockermq多主多从异步复制部署参考 canal github 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更.从 2010 ...

  2. 电商秒杀 Springboot + Redis +RocketMq +Mysql

    项目地址 https://gitee.com/noah2021/miaosha 转载,亲测可用! 测试用例 在下订单之前需要先发布对应的商品用于在Redis中生成口令避免大量请求导致服务器崩溃~~ 发 ...

  3. mysql 树排序_从MySQL开始聊聊“树”结构 (上)

    前言 嗨喽,大家好,我是CrazyCodes, 近一年写的文章,都是一些广度方面的思考,新的一年,在技术深度上也需要有更多的探索,感谢各位的持续支持! MySQL 先聊聊大家熟知的MySQL,我们使用 ...

  4. mysql安装注意步骤,mysql安装步骤

    mysql安装步骤 1.在官网下载对应的压缩文件,放到本地文件夹下,解压缩. 2.配置Path环境变量:新增mysql的bin文件夹路径,C:\software\mysql-8.0.23-winx64 ...

  5. liunx上mysql源码安装mysql,搞定linux上MySQL编程(一):linux上源码安装MySQL

    [版权声明:尊重原创,转载请保留出处:blog.csdn.net/shallnet,文章仅供学习交流,请勿用于商业用途] 1. 首先下载源码包: ftp://ftp.jaist.ac.jp/pub/m ...

  6. is this mysql server_远程连接MySQL数据库报错:is not allowed to connect to this MYSQL server的解决办法...

    1. 改表法. 可能是你的帐号不允许从远程登陆,只能在localhost.这个时候只要在localhost的那台电脑,登入MySQL后,更改 "mysql" 数据库里的 " ...

  7. linux 修改mysql root密码_Linux mysql如何更改root密码

    说到root密码,很多人想到的是电脑系统的root账号密码,其实mysql也有root密码,那么在Linux系统中,mysql要如何修改root密码呢?特别是忘记了root密码要怎么办? 通过登录my ...

  8. tcmalloc mysql 缓存_Tcmalloc优化Mysql内存管理

    实验环境: OS:Redhat 5.3 64bit Mysql:mysql 5.5.29 TCMalloc(Thread-Caching Malloc)与标准glibc库的malloc实现一样的功能, ...

  9. PHP mysql数据迁移,【MySQL】迁移数据目录php-php教程

    move dir # mv /var/libmysql /mnt/data/ vi /etc/my.cnf [mysqld] // 服务器端 datadir=/mnt/data/mysql socke ...

最新文章

  1. Potocol Buffer详解
  2. python是用什么语言开发的-python是什么语言?哪些人适合学习Python?
  3. MyBatisPlus介绍入门以及项目集成MyBatisPlus
  4. ubuntu 配置url地址重定向协议
  5. .NET静态类的概念
  6. linux内存管理总结
  7. 用VMware GSX和W2K群集服务实现Exchange群集
  8. Dijkstral算法--单源最短路
  9. fireworks切图
  10. 为你的软件选择正确的许可证方案
  11. XJOI 3709 测测你的RP
  12. 遥感数据存储格式 ----BSQ、 BIL、BIP及相互转换
  13. 从互联网大厂跳槽到国企后,我发现没有一劳永逸的工作。。。
  14. 赵小楼《天道》《遥远的救世主》解读(71)客观逻辑与离相的渊源
  15. 尘埃落定!AI 大牛贾佳亚离开腾讯优图,创立思谋科技,投身差异化 AI 创业
  16. 【Python】基于机器学习的财务数据分析——识别财务造假
  17. Linux arping命令测试IP地址冲突
  18. java一竖,java 添加一个竖滚动条
  19. 联想超级计算机盈利,联想集团2019年营收 联想集团全年营收多少
  20. python int()函数详解

热门文章

  1. oracle 修改lsnrctl,lsnrctl oracle 监听器 命令行
  2. 给予Java初学者的学习路线建议
  3. 聊聊编程中的 “魔数”
  4. 飞腾FT2000/4 CPU UEFI开发 固件配置
  5. hankerrank 刷题二( Python 基础)
  6. 计算机网络 - (三)电脑如何获取到IP的
  7. 【电力电子技术DC-AC】三相SPWM逆变器Simulink仿真(设置死区时间)
  8. Repeater的 Items属性、Items里面的控件有几个?
  9. 查找代码文件中的非 ASCII 字符
  10. EasyExcel 实现批量合并单元格(支持自定义)