seata源码解析:TM RM 客户端的初始化过程
TM和RM初始化过程
上一篇文章说过,在Spring启动的过程中就会就会初始化TM和RM,建立与TC的长连接。TM,RM,TC都是用netty来处理网络连接的,初始化netty客户端和服务端的过程也非常类似。
本篇文章只分析TM的初始化过程,RM和TM复用了很多方法
// TmNettyRemotingClient
public void init() {// registry processor// 注册消息处理器registerProcessor();if (initialized.compareAndSet(false, true)) {super.init();}
}
// AbstractNettyRemotingClient
public void init() {// 不断连接seata servertimerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);// 是否允许批量发送请求if (NettyClientConfig.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}// 移除发送超时的消息super.init();clientBootstrap.start();
}
clientBootstrap#start是netty启动的模版代码,注册消息处理器和处理消息的套路我在seata server启动的文章分析的比较详细,本篇文章就不深入分析了
建立和TC的连接
TM和RM每隔10s都要TC集群的每个地址建立长连接
// NettyClientChannelManager#reconnect
void reconnect(String transactionServiceGroup) {List<String> availList = null;try {// 获得事务分组对应的集群中每台机器地址availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}if (CollectionUtils.isEmpty(availList)) {RegistryService registryService = RegistryFactory.getInstance();String clusterName = registryService.getServiceGroup(transactionServiceGroup);if (StringUtils.isBlank(clusterName)) {LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,transactionServiceGroup);return;}if (!(registryService instanceof FileRegistryServiceImpl)) {LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);}return;}// 遍历tc服务器地址for (String serverAddress : availList) {try {// 建立与tc的连接acquireChannel(serverAddress);} catch (Exception e) {LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);}}
}
Channel acquireChannel(String serverAddress) {Channel channelToServer = channels.get(serverAddress);// 与当前serverAddress已经存在连接,直接返回if (channelToServer != null) {channelToServer = getExistAliveChannel(channelToServer, serverAddress);if (channelToServer != null) {return channelToServer;}}if (LOGGER.isInfoEnabled()) {LOGGER.info("will connect to " + serverAddress);}// 与当前serverAddress不存在连接,新建连接Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());synchronized (lockObj) {return doConnect(serverAddress);}
}
private Channel doConnect(String serverAddress) {Channel channelToServer = channels.get(serverAddress);// 当前地址已经存在连接if (channelToServer != null && channelToServer.isActive()) {return channelToServer;}Channel channelFromPool;try {NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());}channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));channels.put(serverAddress, channelFromPool);} catch (Exception exx) {LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);throw new FrameworkException("can not register RM,err:" + exx.getMessage());}return channelFromPool;
}
TM和RM客户端在启动的时候会和集群中的的每台seata server建立长连接,但是在后续发送请求的时候,比如开启全局事务,注册分支事务只会和其中的一台机器通讯,TM或RM首先根据事务分组找到集群列表,然后根据负载均衡策略从列表中选出一台机器发起请求。具体代码可参见AbstractNettyRemotingClient#sendSyncRequest方法
参考博客
[1]https://blog.csdn.net/zjj2006/category_10310426.html
[2]https://blog.csdn.net/weixin_38308374/article/details/108944877
seata源码解析:TM RM 客户端的初始化过程相关推荐
- Seata 源码分析 - tm、rm 中 xid 传递过程
一.Seata 前面文章讲解了对 Seata 的 AT 和 TCC 模式的使用,本篇文章为大家讲解下 Seata 中 TM.RM 中 xid 传递过程,如果不了解 Seata 中的 xid,可以理解为 ...
- 谷歌BERT预训练源码解析(三):训练过程
目录 前言 源码解析 主函数 自定义模型 遮蔽词预测 下一句预测 规范化数据集 前言 本部分介绍BERT训练过程,BERT模型训练过程是在自己的TPU上进行的,这部分我没做过研究所以不做深入探讨.BE ...
- Spring源码解析 -- SpringWeb请求映射Map初始化
简介 在上篇文章中,大致解析了Spring如何将请求路径与处理方法进行映射,但映射相关的初始化对于我们来说还是一团迷雾 本篇文章就来探索下,请求路径和处理方法的映射,是如何进行初始化的 概览 基于上篇 ...
- Spring AOP源码解析-拦截器链的执行过程
一.简介 在前面的两篇文章中,分别介绍了 Spring AOP 是如何为目标 bean 筛选合适的通知器,以及如何创建代理对象的过程.现在得到了 bean 的代理对象,且通知也以合适的方式插在了目标方 ...
- android资源加载流程6,FrameWork源码解析(6)-AssetManager加载资源过程
之前一段时间项目比较忙所以一直没有更新,接下来准备把插件化系列的文章写完,今天我们就先跳过ContentProvider源码解析来讲资源加载相关的知识,资源加载可以说是插件化非常重要的一环,我们很有必 ...
- springboot启动源码解析(三):初始化启动上下文、初始化监听器列表、发布开始启动事件
此章节主要对springboot启动过程中,发生的[初始化启动上下文].[初始化监听器列表].[发布springboot开始启动事件]进行源码解析,对应的代码如图1所示: 图1: // 首先初始化一个 ...
- android项目源码解析04:新浪微博客户端源码解析
本文主要介绍如何构建新浪微博客户端.以网上流传weiboSina源码为例介绍,其下载地址为: http://download.csdn.net/detail/ryzhanglu/3453875. 1. ...
- Tomcat源码解析三:tomcat的启动过程
Tomcat组件生命周期管理 在Tomcat总体结构 (Tomcat源代码解析之二)中,我们列出了Tomcat中Server,Service,Connector,Engine,Host,Context ...
- 分布式事务Seata源码解析十:AT模式回滚日志undo log详细构建过程
文章目录 一.前言 二.准备undo log 0.undo log 样例 1)undo log表结构 2)rollback_info(回滚日志数据) 1.before image的构建 1)业务表元数 ...
最新文章
- 大物实验总结模板_高考化学实验题答题模板归类总结!
- double free or corruption (fasttop)
- 类和对象—对象特性—深拷贝与浅拷贝
- Java中的数据结构
- 一朵云、一张网、一体化 ——GRTN 打造最佳流媒体场景实践
- C++调用C#的dll
- C++编程中的四个调试小技巧
- Stateflow历史节点的使用
- debian9.4配置iso作为更新源
- 【5分钟 Paper】Deterministic Policy Gradient Algorithms
- 20155339 《信息安全系统设计基础》课程总结
- Andriod Recovery模式及ClockworkMod Recovery简介
- android mds文件,安卓手机如何打开.mdf文件
- 第16课 火眼金睛——人脸识别
- win10装kali linux双系统,win10安装kali组成双系统攻略
- 全自动共享软件破解器4.8
- MAE 论文逐段精读【论文精读】(深度学习论文篇)
- open judge 1.6.3
- 协议 驱动 接口 服务器,TCP/IP 协议底层驱动原理 (含网卡芯片读写) 说明 [撸 swoole 和 workerman 的同学建议瞧瞧]...
- windows安装vcpkg过程下载失败问题的解决方法