TM和RM初始化过程

上一篇文章说过,在Spring启动的过程中就会就会初始化TM和RM,建立与TC的长连接。TM,RM,TC都是用netty来处理网络连接的,初始化netty客户端和服务端的过程也非常类似。

本篇文章只分析TM的初始化过程,RM和TM复用了很多方法

// TmNettyRemotingClient
public void init() {// registry processor// 注册消息处理器registerProcessor();if (initialized.compareAndSet(false, true)) {super.init();}
}
// AbstractNettyRemotingClient
public void init() {// 不断连接seata servertimerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);// 是否允许批量发送请求if (NettyClientConfig.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}// 移除发送超时的消息super.init();clientBootstrap.start();
}

clientBootstrap#start是netty启动的模版代码,注册消息处理器和处理消息的套路我在seata server启动的文章分析的比较详细,本篇文章就不深入分析了

建立和TC的连接

TM和RM每隔10s都要TC集群的每个地址建立长连接

// NettyClientChannelManager#reconnect
void reconnect(String transactionServiceGroup) {List<String> availList = null;try {// 获得事务分组对应的集群中每台机器地址availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}if (CollectionUtils.isEmpty(availList)) {RegistryService registryService = RegistryFactory.getInstance();String clusterName = registryService.getServiceGroup(transactionServiceGroup);if (StringUtils.isBlank(clusterName)) {LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,transactionServiceGroup);return;}if (!(registryService instanceof FileRegistryServiceImpl)) {LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);}return;}// 遍历tc服务器地址for (String serverAddress : availList) {try {// 建立与tc的连接acquireChannel(serverAddress);} catch (Exception e) {LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);}}
}
Channel acquireChannel(String serverAddress) {Channel channelToServer = channels.get(serverAddress);// 与当前serverAddress已经存在连接,直接返回if (channelToServer != null) {channelToServer = getExistAliveChannel(channelToServer, serverAddress);if (channelToServer != null) {return channelToServer;}}if (LOGGER.isInfoEnabled()) {LOGGER.info("will connect to " + serverAddress);}// 与当前serverAddress不存在连接,新建连接Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());synchronized (lockObj) {return doConnect(serverAddress);}
}
private Channel doConnect(String serverAddress) {Channel channelToServer = channels.get(serverAddress);// 当前地址已经存在连接if (channelToServer != null && channelToServer.isActive()) {return channelToServer;}Channel channelFromPool;try {NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());}channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));channels.put(serverAddress, channelFromPool);} catch (Exception exx) {LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);throw new FrameworkException("can not register RM,err:" + exx.getMessage());}return channelFromPool;
}


TM和RM客户端在启动的时候会和集群中的的每台seata server建立长连接,但是在后续发送请求的时候,比如开启全局事务,注册分支事务只会和其中的一台机器通讯,TM或RM首先根据事务分组找到集群列表,然后根据负载均衡策略从列表中选出一台机器发起请求。具体代码可参见AbstractNettyRemotingClient#sendSyncRequest方法

参考博客

[1]https://blog.csdn.net/zjj2006/category_10310426.html
[2]https://blog.csdn.net/weixin_38308374/article/details/108944877

seata源码解析:TM RM 客户端的初始化过程相关推荐

  1. Seata 源码分析 - tm、rm 中 xid 传递过程

    一.Seata 前面文章讲解了对 Seata 的 AT 和 TCC 模式的使用,本篇文章为大家讲解下 Seata 中 TM.RM 中 xid 传递过程,如果不了解 Seata 中的 xid,可以理解为 ...

  2. 谷歌BERT预训练源码解析(三):训练过程

    目录 前言 源码解析 主函数 自定义模型 遮蔽词预测 下一句预测 规范化数据集 前言 本部分介绍BERT训练过程,BERT模型训练过程是在自己的TPU上进行的,这部分我没做过研究所以不做深入探讨.BE ...

  3. Spring源码解析 -- SpringWeb请求映射Map初始化

    简介 在上篇文章中,大致解析了Spring如何将请求路径与处理方法进行映射,但映射相关的初始化对于我们来说还是一团迷雾 本篇文章就来探索下,请求路径和处理方法的映射,是如何进行初始化的 概览 基于上篇 ...

  4. Spring AOP源码解析-拦截器链的执行过程

    一.简介 在前面的两篇文章中,分别介绍了 Spring AOP 是如何为目标 bean 筛选合适的通知器,以及如何创建代理对象的过程.现在得到了 bean 的代理对象,且通知也以合适的方式插在了目标方 ...

  5. android资源加载流程6,FrameWork源码解析(6)-AssetManager加载资源过程

    之前一段时间项目比较忙所以一直没有更新,接下来准备把插件化系列的文章写完,今天我们就先跳过ContentProvider源码解析来讲资源加载相关的知识,资源加载可以说是插件化非常重要的一环,我们很有必 ...

  6. springboot启动源码解析(三):初始化启动上下文、初始化监听器列表、发布开始启动事件

    此章节主要对springboot启动过程中,发生的[初始化启动上下文].[初始化监听器列表].[发布springboot开始启动事件]进行源码解析,对应的代码如图1所示: 图1: // 首先初始化一个 ...

  7. android项目源码解析04:新浪微博客户端源码解析

    本文主要介绍如何构建新浪微博客户端.以网上流传weiboSina源码为例介绍,其下载地址为: http://download.csdn.net/detail/ryzhanglu/3453875. 1. ...

  8. Tomcat源码解析三:tomcat的启动过程

    Tomcat组件生命周期管理 在Tomcat总体结构 (Tomcat源代码解析之二)中,我们列出了Tomcat中Server,Service,Connector,Engine,Host,Context ...

  9. 分布式事务Seata源码解析十:AT模式回滚日志undo log详细构建过程

    文章目录 一.前言 二.准备undo log 0.undo log 样例 1)undo log表结构 2)rollback_info(回滚日志数据) 1.before image的构建 1)业务表元数 ...

最新文章

  1. 大物实验总结模板_高考化学实验题答题模板归类总结!
  2. double free or corruption (fasttop)
  3. 类和对象—对象特性—深拷贝与浅拷贝
  4. Java中的数据结构
  5. 一朵云、一张网、一体化 ——GRTN 打造最佳流媒体场景实践
  6. C++调用C#的dll
  7. C++编程中的四个调试小技巧
  8. Stateflow历史节点的使用
  9. debian9.4配置iso作为更新源
  10. 【5分钟 Paper】Deterministic Policy Gradient Algorithms
  11. 20155339 《信息安全系统设计基础》课程总结
  12. Andriod Recovery模式及ClockworkMod Recovery简介
  13. android mds文件,安卓手机如何打开.mdf文件
  14. 第16课 火眼金睛——人脸识别
  15. win10装kali linux双系统,win10安装kali组成双系统攻略
  16. 全自动共享软件破解器4.8
  17. MAE 论文逐段精读【论文精读】(深度学习论文篇)
  18. open judge 1.6.3
  19. 协议 驱动 接口 服务器,TCP/IP 协议底层驱动原理 (含网卡芯片读写) 说明 [撸 swoole 和 workerman 的同学建议瞧瞧]...
  20. windows安装vcpkg过程下载失败问题的解决方法

热门文章

  1. C/C++面试题-2 之2/2
  2. 2022年全国最新消防设施操作员模拟试题题库及答案
  3. 学生成绩管理系统总体设计
  4. matlab图源代码,[转载]常用的一些图像处理Matlab源代码
  5. 机器人公司都吓傻了?因为谷东AR入局了嘛!
  6. 2021-3-2打砖块游戏,轮播图,swiper,自执行函数
  7. Linux yum的在线安装(yum命令)
  8. yum-网络yum和本地yum
  9. Mysql安装(转自韩顺平教育)
  10. 单应性(Homography)变换