rocketmq mysql_聊聊rocketmq-mysql的BinlogPositionManager
序
本文主要研究一下rocketmq-mysql的BinlogPositionManager
BinlogPositionManager
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
public class BinlogPositionManager {
private Logger logger = LoggerFactory.getLogger(BinlogPositionManager.class);
private DataSource dataSource;
private Config config;
private String binlogFilename;
private Long nextPosition;
public BinlogPositionManager(Config config, DataSource dataSource) {
this.config = config;
this.dataSource = dataSource;
}
public void initBeginPosition() throws Exception {
if (config.startType == null || config.startType.equals("DEFAULT")) {
initPositionDefault();
} else if (config.startType.equals("NEW_EVENT")) {
initPositionFromBinlogTail();
} else if (config.startType.equals("LAST_PROCESSED")) {
initPositionFromMqTail();
} else if (config.startType.equals("SPECIFIED")) {
binlogFilename = config.binlogFilename;
nextPosition = config.nextPosition;
}
if (binlogFilename == null || nextPosition == null) {
throw new Exception("binlogFilename | nextPosition is null.");
}
}
//......
public String getBinlogFilename() {
return binlogFilename;
}
public Long getPosition() {
return nextPosition;
}
}
BinlogPositionManager提供了initBeginPosition、getBinlogFilename、getPosition方法;其中initBeginPosition方法根据config.startType的类型来执行不同逻辑,DEFAULT的是执行initPositionDefault,NEW_EVENT是执行initPositionFromBinlogTail,LAST_PROCESSED是执行initPositionFromMqTail,SPECIFIED是设置指定的binlogFilename及nextPosition
initPositionDefault
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
public class BinlogPositionManager {
//......
private void initPositionDefault() throws Exception {
try {
initPositionFromMqTail();
} catch (Exception e) {
logger.error("Init position from mq error.", e);
}
if (binlogFilename == null || nextPosition == null) {
initPositionFromBinlogTail();
}
}
//......
}
initPositionDefault执行的是initPositionFromMqTail方法,在binlogFilename或者nextPosition为null时还会执行initPositionFromBinlogTail
initPositionFromBinlogTail
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
public class BinlogPositionManager {
//......
private void initPositionFromBinlogTail() throws SQLException {
String sql = "SHOW MASTER STATUS";
Connection conn = null;
ResultSet rs = null;
try {
Connection connection = dataSource.getConnection();
rs = connection.createStatement().executeQuery(sql);
while (rs.next()) {
binlogFilename = rs.getString("File");
nextPosition = rs.getLong("Position");
}
} finally {
if (conn != null) {
conn.close();
}
if (rs != null) {
rs.close();
}
}
}
//......
}
initPositionFromBinlogTail方法通过SHOW MASTER STATUS来获取最新的binlogFilename及nextPosition
initPositionFromMqTail
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
public class BinlogPositionManager {
//......
private void initPositionFromMqTail() throws Exception {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("BINLOG_CONSUMER_GROUP");
consumer.setNamesrvAddr(config.mqNamesrvAddr);
consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
consumer.start();
Set queues = consumer.fetchSubscribeMessageQueues(config.mqTopic);
MessageQueue queue = queues.iterator().next();
if (queue != null) {
Long offset = consumer.maxOffset(queue);
if (offset > 0)
offset--;
PullResult pullResult = consumer.pull(queue, "*", offset, 100);
if (pullResult.getPullStatus() == PullStatus.FOUND) {
MessageExt msg = pullResult.getMsgFoundList().get(0);
String json = new String(msg.getBody(), "UTF-8");
JSONObject js = JSON.parseObject(json);
binlogFilename = (String) js.get("binlogFilename");
nextPosition = js.getLong("nextPosition");
}
}
}
//......
}
initPositionFromMqTail方法创建DefaultMQPullConsumer,然后从指定的topic拉取MessageQueue,然后通过consumer.maxOffset(queue)获取offset,再执行pull,找到第一个msg,然后设置binlogFilename及nextPosition
小结
BinlogPositionManager提供了initBeginPosition、getBinlogFilename、getPosition方法;其中initBeginPosition方法根据config.startType的类型来执行不同逻辑,DEFAULT的是执行initPositionDefault,NEW_EVENT是执行initPositionFromBinlogTail,LAST_PROCESSED是执行initPositionFromMqTail,SPECIFIED是设置指定的binlogFilename及nextPosition
rocketmq mysql_聊聊rocketmq-mysql的BinlogPositionManager相关推荐
- c语言连接mysql_聊聊数据库MySQL、SqlServer、Oracle的区别,哪个更适合你?
一.MySQL 优点: 体积小.速度快.总体拥有成本低,开源: 支持多种操作系统: 是开源数据库,提供的接口支持多种语言连接操作 : MySQL的核心程序采用完全的多线程编程.线程是轻量级的进程,它可 ...
- 聊聊rocketmq的ProducerImpl
序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java pu ...
- 聊聊rocketmq的RemotingException
序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/Remoti ...
- 聊聊rocketmq的BrokerHousekeepingService
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下rocketmq的BrokerHousekeepingService BrokerHousekeepingServ ...
- 与顶级互联网公司技术大佬面对面聊聊RocketMQ
作为由阿里巴巴捐赠的Apache顶级云原生消息中间件,RocketMQ 立足于在线交易链路,帮助企业实现异步解耦和削峰填谷以及 IoT 边缘数据以及 C 端用户行为数据采集传输和集成等众多功能.我们可 ...
- 聊聊rocketmq的ConsumerManageProcessor
序 本文主要研究一下rocketmq的ConsumerManageProcessor NettyRequestProcessor rocketmq-all-4.6.0-source-release/r ...
- 聊聊rocketmq的ConsumerIdsChangeListener
序 本文主要研究一下rocketmq的ConsumerIdsChangeListener ConsumerGroupEvent rocketmq-all-4.6.0-source-release/br ...
- 聊聊rocketmq的FileAppender
序 本文主要研究一下rocketmq的FileAppender WriterAppender org/apache/rocketmq/logging/inner/LoggingBuilder.java ...
- 真香,聊聊 RocketMQ 5.0 的 POP 消费模式!
大家好,我是君哥. 大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式. 不过,RocketMQ 的 ...
最新文章
- Tomcat参数配置
- 计算机四级嵌入式真题,2014年3月计算机四级嵌入式工程师真题试题及答案
- Java EE之RMI
- 如何修改dedecms专题目录默认名称special
- 用Python画一只蝙蝠
- 香帅的北大金融学课笔记10 -- 金融衍生品
- mysql慢日志管理
- Linux文件系统基础(1)
- 学成在线案例——黑马程序员pink老师\思路讲解\完整源代码
- Android 功耗(20)---Android后台调度与省电
- 一代经典框架 Layui 落幕,是否预示着一个时代的结束?
- python调用soap_如何在python zeep中调用soap api而不使用wsdl(非wsdl模式)?使用用户和密码身份验证调用位置URL...
- 解析大型.NET ERP系统 业务逻辑设计与实现
- 计算机基础知识及键盘熟悉实验报告,实验报告-实验一计算机的认识与指法练习.doc...
- sql server 2008 的数据类型大全
- 离线ROS API文档(Zeal或Dash)
- 全景图(三):在Unity3D上实现360°球面投影
- python哥德巴赫猜想_Python验证哥德巴赫猜想
- uniapp微信小程序授权登录和获取微信绑定的手机号码
- 小游戏如何带动直播平台发展?
热门文章
- php异业联盟平台源码,基于ThinkPHP5.1框架开发的B2C在线商城系统PHP源码
- C++中INT与BYTE相互转换
- 十三周二次课(6月20日)
- c++ 静态类成员函数(static member function) vs 名字空间 (namespace)
- Postgresql的character varying = bytea问题
- IE下easyui 缓存问题
- 努力只是因为想去做想做的事
- VC++ 6.0 快捷键大全
- 完全开源的杀病毒软件ClamAV
- 通过jQuery的attr修改onclick