本文主要研究一下rocketmq-mysql的BinlogPositionManager

BinlogPositionManager

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java

public class BinlogPositionManager {

private Logger logger = LoggerFactory.getLogger(BinlogPositionManager.class);

private DataSource dataSource;

private Config config;

private String binlogFilename;

private Long nextPosition;

public BinlogPositionManager(Config config, DataSource dataSource) {

this.config = config;

this.dataSource = dataSource;

}

public void initBeginPosition() throws Exception {

if (config.startType == null || config.startType.equals("DEFAULT")) {

initPositionDefault();

} else if (config.startType.equals("NEW_EVENT")) {

initPositionFromBinlogTail();

} else if (config.startType.equals("LAST_PROCESSED")) {

initPositionFromMqTail();

} else if (config.startType.equals("SPECIFIED")) {

binlogFilename = config.binlogFilename;

nextPosition = config.nextPosition;

}

if (binlogFilename == null || nextPosition == null) {

throw new Exception("binlogFilename | nextPosition is null.");

}

}

//......

public String getBinlogFilename() {

return binlogFilename;

}

public Long getPosition() {

return nextPosition;

}

}

BinlogPositionManager提供了initBeginPosition、getBinlogFilename、getPosition方法;其中initBeginPosition方法根据config.startType的类型来执行不同逻辑,DEFAULT的是执行initPositionDefault,NEW_EVENT是执行initPositionFromBinlogTail,LAST_PROCESSED是执行initPositionFromMqTail,SPECIFIED是设置指定的binlogFilename及nextPosition

initPositionDefault

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java

public class BinlogPositionManager {

//......

private void initPositionDefault() throws Exception {

try {

initPositionFromMqTail();

} catch (Exception e) {

logger.error("Init position from mq error.", e);

}

if (binlogFilename == null || nextPosition == null) {

initPositionFromBinlogTail();

}

}

//......

}

initPositionDefault执行的是initPositionFromMqTail方法,在binlogFilename或者nextPosition为null时还会执行initPositionFromBinlogTail

initPositionFromBinlogTail

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java

public class BinlogPositionManager {

//......

private void initPositionFromBinlogTail() throws SQLException {

String sql = "SHOW MASTER STATUS";

Connection conn = null;

ResultSet rs = null;

try {

Connection connection = dataSource.getConnection();

rs = connection.createStatement().executeQuery(sql);

while (rs.next()) {

binlogFilename = rs.getString("File");

nextPosition = rs.getLong("Position");

}

} finally {

if (conn != null) {

conn.close();

}

if (rs != null) {

rs.close();

}

}

}

//......

}

initPositionFromBinlogTail方法通过SHOW MASTER STATUS来获取最新的binlogFilename及nextPosition

initPositionFromMqTail

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java

public class BinlogPositionManager {

//......

private void initPositionFromMqTail() throws Exception {

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("BINLOG_CONSUMER_GROUP");

consumer.setNamesrvAddr(config.mqNamesrvAddr);

consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));

consumer.start();

Set queues = consumer.fetchSubscribeMessageQueues(config.mqTopic);

MessageQueue queue = queues.iterator().next();

if (queue != null) {

Long offset = consumer.maxOffset(queue);

if (offset > 0)

offset--;

PullResult pullResult = consumer.pull(queue, "*", offset, 100);

if (pullResult.getPullStatus() == PullStatus.FOUND) {

MessageExt msg = pullResult.getMsgFoundList().get(0);

String json = new String(msg.getBody(), "UTF-8");

JSONObject js = JSON.parseObject(json);

binlogFilename = (String) js.get("binlogFilename");

nextPosition = js.getLong("nextPosition");

}

}

}

//......

}

initPositionFromMqTail方法创建DefaultMQPullConsumer,然后从指定的topic拉取MessageQueue,然后通过consumer.maxOffset(queue)获取offset,再执行pull,找到第一个msg,然后设置binlogFilename及nextPosition

小结

BinlogPositionManager提供了initBeginPosition、getBinlogFilename、getPosition方法;其中initBeginPosition方法根据config.startType的类型来执行不同逻辑,DEFAULT的是执行initPositionDefault,NEW_EVENT是执行initPositionFromBinlogTail,LAST_PROCESSED是执行initPositionFromMqTail,SPECIFIED是设置指定的binlogFilename及nextPosition

rocketmq mysql_聊聊rocketmq-mysql的BinlogPositionManager相关推荐

  1. c语言连接mysql_聊聊数据库MySQL、SqlServer、Oracle的区别,哪个更适合你?

    一.MySQL 优点: 体积小.速度快.总体拥有成本低,开源: 支持多种操作系统: 是开源数据库,提供的接口支持多种语言连接操作 : MySQL的核心程序采用完全的多线程编程.线程是轻量级的进程,它可 ...

  2. 聊聊rocketmq的ProducerImpl

    序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java pu ...

  3. 聊聊rocketmq的RemotingException

    序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/Remoti ...

  4. 聊聊rocketmq的BrokerHousekeepingService

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下rocketmq的BrokerHousekeepingService BrokerHousekeepingServ ...

  5. 与顶级互联网公司技术大佬面对面聊聊RocketMQ

    作为由阿里巴巴捐赠的Apache顶级云原生消息中间件,RocketMQ 立足于在线交易链路,帮助企业实现异步解耦和削峰填谷以及 IoT 边缘数据以及 C 端用户行为数据采集传输和集成等众多功能.我们可 ...

  6. 聊聊rocketmq的ConsumerManageProcessor

    序 本文主要研究一下rocketmq的ConsumerManageProcessor NettyRequestProcessor rocketmq-all-4.6.0-source-release/r ...

  7. 聊聊rocketmq的ConsumerIdsChangeListener

    序 本文主要研究一下rocketmq的ConsumerIdsChangeListener ConsumerGroupEvent rocketmq-all-4.6.0-source-release/br ...

  8. 聊聊rocketmq的FileAppender

    序 本文主要研究一下rocketmq的FileAppender WriterAppender org/apache/rocketmq/logging/inner/LoggingBuilder.java ...

  9. 真香,聊聊 RocketMQ 5.0 的 POP 消费模式!

    大家好,我是君哥. 大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式. 不过,RocketMQ 的 ...

最新文章

  1. Tomcat参数配置
  2. 计算机四级嵌入式真题,2014年3月计算机四级嵌入式工程师真题试题及答案
  3. Java EE之RMI
  4. 如何修改dedecms专题目录默认名称special
  5. 用Python画一只蝙蝠
  6. 香帅的北大金融学课笔记10 -- 金融衍生品
  7. mysql慢日志管理
  8. Linux文件系统基础(1)
  9. 学成在线案例——黑马程序员pink老师\思路讲解\完整源代码
  10. Android 功耗(20)---Android后台调度与省电
  11. 一代经典框架 Layui 落幕,是否预示着一个时代的结束?
  12. python调用soap_如何在python zeep中调用soap api而不使用wsdl(非wsdl模式)?使用用户和密码身份验证调用位置URL...
  13. 解析大型.NET ERP系统 业务逻辑设计与实现
  14. 计算机基础知识及键盘熟悉实验报告,实验报告-实验一计算机的认识与指法练习.doc...
  15. sql server 2008 的数据类型大全
  16. 离线ROS API文档(Zeal或Dash)
  17. 全景图(三):在Unity3D上实现360°球面投影
  18. python哥德巴赫猜想_Python验证哥德巴赫猜想
  19. uniapp微信小程序授权登录和获取微信绑定的手机号码
  20. 小游戏如何带动直播平台发展?

热门文章

  1. php异业联盟平台源码,基于ThinkPHP5.1框架开发的B2C在线商城系统PHP源码
  2. C++中INT与BYTE相互转换
  3. 十三周二次课(6月20日)
  4. c++ 静态类成员函数(static member function) vs 名字空间 (namespace)
  5. Postgresql的character varying = bytea问题
  6. IE下easyui 缓存问题
  7. 努力只是因为想去做想做的事
  8. VC++ 6.0 快捷键大全
  9. 完全开源的杀病毒软件ClamAV
  10. 通过jQuery的attr修改onclick