序言:

对于阿里开源分布式事务框架seata的详细了解可以参考官网,这里不会详细介绍。本章只会介绍seata中AT模式的源码分析(对阿seata有一定了解或者成功完成过demo)。

seata中一个事务的开启是由TM角色来完成,在整体事务发起方我们可以通过在执行方法中包含@GlobalTransactional来标示启用全局事务,并包含该全局事务的一定自定义设置。如下所示:

public @interface GlobalTransactional {/*** 设置该全局事务下执行的超时时间 默认毫秒** @return timeoutMills in MILLISECONDS.*/int timeoutMills() default TransactionInfo.DEFAULT_TIME_OUT;/*** 全局事务实例的的名称** @return Given name.*/String name() default "";/*** 设置哪些异常类发生需要进行rollback* @return*/Class<? extends Throwable>[] rollbackFor() default {};/***  设置哪些异常类名发生需要进行rollback* @return*/String[] rollbackForClassName() default {};/*** 设置哪些异常类发生不需要进行rollback* @return*/Class<? extends Throwable>[] noRollbackFor() default {};/*** 设置哪些异常类名发生不需要进行rollback* @return*/String[] noRollbackForClassName() default {};/*** 事务传播级别 默认REQUIRED* * @return*/Propagation propagation() default Propagation.REQUIRED;
}

1:seata客户端-TM(基于springcloud项目分析)

1.0:GlobalTransactionScanner

使用过spring的@Transactional事务的实现知道,它通过动态代理的方式,将事务的创建,提交或回滚这些公干的动作封装到一套执行模版中。这种方式在很多开源框架都是如此构建的例如mybtis中的各执行注解(@Update,@Select等),Springcloud中的Feign调用啊等。通过idea我们可以查看@GlobalTransactional该注解在什么地方被使用到,如下所示:

如果对于springboot的一些开源start(例如mybatis中MapperScannerRegistrar等)项目有过源码走读的经验,从GlobalTransactionScanner名字可以看出该类负责扫描GlobalTransaction注解并构建其代理方法(比较@GlobalTransactionScanner作用在方法中)。继续通过ieda的Find Usages功能寻找GobalTransactionScanner的引用,发现在seata的spring starter项目中的SeataAutoConfiguration对其进行初始化。

我们镜头回到GobalTransactionScanner中(对于SeataAutoConfiguration的其它作用后续描述)。GobalTransactionScanner实现了InitializingBean(bean初始化完成后执行),AbstractAutoProxyCreator,ApplicationContextAware(获取ApplicationContext对象),DisposableBean(bean被消耗时执行),分别对应的spring中bean不同的生命周期。如下所示是spring bean初始化完成后执行

这里会存在一个疑问,为何要开启RM与TM两个client,如果对于某一个服务它在分布式事务链路中只是作为一个分支即RM的角色而非TM,那么对于这TM的启动是否没有存在必要,毕竟需要开启TM与TC之间的连接通道,也是一个资源的浪费。

1.1:客户端TM client

在1.2与1.3对于TM与TC之间连接的有关的管理类有着不同的命名

1.2的时候命名为TmRpcClient

对于1.3的时候改命名为TmNettyRemotingClient如下所示:

其实不论上述两个版本核心都是通过Netty作为服务之间远程网络通信基础架构,所以1.3的改为TmNettyRemotingClient更简单表达底层实现原理。后续都以1.3最新版作为讲解

1.1.1:TmNettyRemotingClient(核心类,TM远程调用client)

public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {private static final Logger LOGGER = LoggerFactory.getLogger(TmNettyRemotingClient.class);private static volatile TmNettyRemotingClient instance;//长链接 keep-alive时间 private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;//常量 线程 等待队列长度private static final int MAX_QUEUE_SIZE = 2000;//是否初始化标示private final AtomicBoolean initialized = new AtomicBoolean(false);//配置applicationId唯一idprivate String applicationId;private String transactionServiceGroup;@Overridepublic void init() {// 注册返回response 消息处理器registerProcessor();//初始化if (initialized.compareAndSet(false, true)) {super.init();}}private TmNettyRemotingClient(NettyClientConfig nettyClientConfig,EventExecutorGroup eventExecutorGroup,ThreadPoolExecutor messageExecutor) {super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);}/*** 获取一个TmNettyRemotingClient** @param applicationId           the application id* @param transactionServiceGroup the transaction service group* @return the instance*/public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {//作为一个单列的形式获取TmNettyRemotingClientTmNettyRemotingClient tmNettyRemotingClient = getInstance();tmNettyRemotingClient.setApplicationId(applicationId);tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);return tmNettyRemotingClient;}/*** 单例获取 懒汉式获取* @return the instance*/public static TmNettyRemotingClient getInstance() {if (instance == null) {synchronized (TmNettyRemotingClient.class) {if (instance == null) {NettyClientConfig nettyClientConfig = new NettyClientConfig();//定义线程poolfinal ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),KEEP_ALIVE_TIME, TimeUnit.SECONDS,new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),nettyClientConfig.getClientWorkerThreads()),RejectedPolicies.runsOldestTaskPolicy());instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);}}}return instance;}/*** Sets application id.** @param applicationId the application id*/public void setApplicationId(String applicationId) {this.applicationId = applicationId;}/*** Sets transaction service group.** @param transactionServiceGroup the transaction service group*/public void setTransactionServiceGroup(String transactionServiceGroup) {this.transactionServiceGroup = transactionServiceGroup;}@Overridepublic String getTransactionServiceGroup() {return transactionServiceGroup;}/***注册成功回调*/@Overridepublic void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,AbstractMessage requestMessage) {RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;if (LOGGER.isInfoEnabled()) {LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel);}getClientChannelManager().registerChannel(serverAddress, channel);}@Overridepublic void onRegisterMsgFail(String serverAddress, Channel channel, Object response,AbstractMessage requestMessage) {RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;String errMsg = String.format("register TM failed. client version: %s,server version: %s, errorMsg: %s, " + "channel: %s", registerTMRequest.getVersion(), registerTMResponse.getVersion(), registerTMResponse.getMsg(), channel);throw new FrameworkException(errMsg);}/*** bean被销毁*/@Overridepublic void destroy() {super.destroy();initialized.getAndSet(false);instance = null;}@Overrideprotected Function<String, NettyPoolKey> getPoolKeyFunction() {return (severAddress) -> {RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup);return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);};}/*** 注册 TC response 有关处理器*/private void registerProcessor() {//注册 TC response netty返回信息解析器ClientOnResponseProcessor onResponseProcessor =new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);// 2.注册 heartbeat netty返回信息解析器ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}
}//父类中init
public void init() {//定义周期延时任务默认10s 该任务用于TM与TC的channel的连接检测 对于断的Channel进行重连timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否配置开启transport.enableClientBatchSendRequest即客户端事务消息请求是否批量合并发送 默认为trueif (NettyClientConfig.isEnableClientBatchSendRequest()) {//定义线程poolmergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));//运行MergedSendRunnable任务即合并发送请求mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//clientBootstrap.start();}

NettyClientBootstrap是对于NettyClient的封装,对该类进行源码分析:

public class NettyClientBootstrap implements RemotingBootstrap {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);//client有关配置private final NettyClientConfig nettyClientConfig;//netty 启动类private final Bootstrap bootstrap = new Bootstrap();//netty workerprivate final EventLoopGroup eventLoopGroupWorker;//事件调度private EventExecutorGroup defaultEventExecutorGroup;//是否初始化标示private final AtomicBoolean initialized = new AtomicBoolean(false);private static final String THREAD_PREFIX_SPLIT_CHAR = "_";private final NettyPoolKey.TransactionRole transactionRole;//netty handler事件private ChannelHandler[] channelHandlers;public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,NettyPoolKey.TransactionRole transactionRole) {if (nettyClientConfig == null) {nettyClientConfig = new NettyClientConfig();if (LOGGER.isInfoEnabled()) {LOGGER.info("use default netty client config.");}}this.nettyClientConfig = nettyClientConfig;int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();this.transactionRole = transactionRole;//nio event groupthis.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),selectorThreadSizeThreadSize));this.defaultEventExecutorGroup = eventExecutorGroup;}/*** Sets channel handlers.** @param handlers the handlers*/protected void setChannelHandlers(final ChannelHandler... handlers) {if (handlers != null) {channelHandlers = handlers;}}/*** Add channel pipeline last.** @param channel  the channel* @param handlers the handlers*/private void addChannelPipelineLast(Channel channel, ChannelHandler... handlers) {if (channel != null && handlers != null) {channel.pipeline().addLast(handlers);}}@Overridepublic void start() {if (this.defaultEventExecutorGroup == null) {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),nettyClientConfig.getClientWorkerThreads()));}//初始化 netty client 并设置option属性this.bootstrap.group(this.eventLoopGroupWorker).channel(nettyClientConfig.getClientChannelClazz()).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,nettyClientConfig.getClientSocketRcvBufSize());if (nettyClientConfig.enableNative()) {if (PlatformDependent.isOsx()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("client run on macOS");}} else {bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);}}//通过pipeline 绑定默认Handler 以及自定义handlerbootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(/*** 设置channel 空闲状态处理器,是用来检测当前Handler的ChannelRead()的空闲时间* int readerIdleTimeSeconds 为读超时时间(即多长时间没有接受到客户端发送数据)* int writerIdleTimeSeconds, 为写超时时间(即多长时间没有向客户端发送数据)* int allIdleTimeSeconds 所有类型(读或写)的超时时间* 根据个参数IdleStateHandler会启动不同的定时任务,根据设定的时长去检测ChannelRead()方法是否被调用,* 如果没有被调用。之后则会调用后续handler的userEventTriggered方法去执行一些事情(比如断开链接)*/new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),nettyClientConfig.getChannelMaxWriteIdleSeconds(),nettyClientConfig.getChannelMaxAllIdleSeconds()))//设置编码解码器.addLast(new ProtocolV1Decoder()).addLast(new ProtocolV1Encoder());if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers);}}});if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {LOGGER.info("NettyClientBootstrap has started");}}@Overridepublic void shutdown() {try {//关闭netty网络资源this.eventLoopGroupWorker.shutdownGracefully();if (this.defaultEventExecutorGroup != null) {this.defaultEventExecutorGroup.shutdownGracefully();}} catch (Exception exx) {LOGGER.error("Failed to shutdown: {}", exx.getMessage());}}/*** * 获取一个新的channel channel为与TC之间网络通道* @param address the address 网络地址* @return the new channel*/public Channel getNewChannel(InetSocketAddress address) {Channel channel;//连接TCChannelFuture f = this.bootstrap.connect(address);try {//等待超时时间内与Server端进行 若无法连接抛出异常f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);if (f.isCancelled()) {throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server.");} else if (!f.isSuccess()) {throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server.");} else {channel = f.channel();}} catch (Exception e) {throw new FrameworkException(e, "can not connect to services-server.");}return channel;}/*** Gets thread prefix.** @param threadPrefix the thread prefix* @return the thread prefix*/private String getThreadPrefix(String threadPrefix) {return threadPrefix + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();}
}

以上为TM大体的初始化过程,详细可自行研读源码


GlobalTransactionScanner中afterPropertiesSet解析完成之后,会执行AbstractAutoProxyCreator(该类用于为Bean生成代理对象,)中的wrapIfNecessary()方法,(AbstractAutoProxyCreator实际上实现了BeanPostProcessor接口,而wrapIfNecessary在postProcessAfterInitialization方法中被调用,因此它在afterPropertiesSet之后执行

wrapIfNecessary源码分析:

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {//是否开启GlobalTransactionif (disableGlobalTransaction) {return bean;}try {//PROXYED_SET 记录已被代理synchronized (PROXYED_SET) {//该bean 是否已被代理 若已被无需重复代理if (PROXYED_SET.contains(beanName)) {return bean;}//MethodInterceptor 定义方法拦截器interceptor = null;//检测是否是TCC 模式下代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));} else {//非jdk代理 基于class方式Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);//如果bean是jdk代理(基于接口) 获取元ClassClass<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//是否包含Annotationif (!existsAnnotation(new Class[]{serviceInterface})&& !existsAnnotation(interfacesIfJdk)) {return bean;}if (interceptor == null) {if (globalTransactionalInterceptor == null) {//构建globalTransactionalInterceptorglobalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果gaibean不是aop代理类bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {// 执行包装目标对象到代理对象  AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));for (Advisor avr : advisor) {advised.addAdvisor(0, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}/*** 目标 classes的方法中是否包含GlobalTransactional 或GlobalLock 注解* @param classes* @return*/private boolean existsAnnotation(Class<?>[] classes) {if (CollectionUtils.isNotEmpty(classes)) {for (Class<?> clazz : classes) {if (clazz == null) {continue;}GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}Method[] methods = clazz.getMethods();for (Method method : methods) {//是否包含GlobalTransactional注解trxAnno = method.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}//GlobalLockGlobalLock lockAnno = method.getAnnotation(GlobalLock.class);if (lockAnno != null) {return true;}}}}return false;}

从上述代码中可看出,用GlobalTransactionalInterceptor 代替了GlobalTransactional 和 GlobalLock 注解的方法

1.3:GlobalTransactionalInterceptor(全局事务拦截器)

该类用于代理处理@GlobalTransactional被执行,如下源码所示:

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();//事务模版类private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>();//失败处理器private final FailureHandler failureHandler;private volatile boolean disable;//服务自检周期  默认2000,单位ms.每2秒进行一次服务自检,来决定private static int degradeCheckPeriod;//降级检测开关 降级开关  默认false。业务侧根据连续错误数自动降级不走seata事务private static volatile boolean degradeCheck;//升降级达标阈值   默认10private static int degradeCheckAllowTimes;private static volatile Integer degradeNum = 0;private static volatile Integer reachNum = 0;private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);//用于周期检测是否降级 执行器 应该在degradeCheck =true时被初始化private static ScheduledThreadPoolExecutor executor =new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));/*** Instantiates a new Global transactional interceptor.** @param failureHandler*            the failure handler*/public GlobalTransactionalInterceptor(FailureHandler failureHandler) {//初始化动作this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,DEFAULT_DISABLE_GLOBAL_TRANSACTION);degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,DEFAULT_TM_DEGRADE_CHECK);//开启降级设置if (degradeCheck) {ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);EVENT_BUS.register(this);if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {startDegradeCheck();}}}/*** 代理方法调用逻辑* @param methodInvocation 被代理的原方法* @return* @throws Throwable*/@Overridepublic Object invoke(final MethodInvocation methodInvocation) throws Throwable {//获取方法所属类Class<?> targetClass =methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;//获取执行具体的Method对象Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);//获取GlobalTransactional注解对象 获取定义属性final GlobalTransactional globalTransactionalAnnotation =getAnnotation(method, targetClass, GlobalTransactional.class);//获取GlobalLock对象final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//是否被降级或者开启全局事务boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);if (!localDisable) {//判定globalTransactional注解还是globalLock全局锁对象if (globalTransactionalAnnotation != null) {return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation);}}}//执行具体方法return methodInvocation.proceed();}/**** @param methodInvocation* @return* @throws Exception*/private Object handleGlobalLock(final MethodInvocation methodInvocation) throws Exception {//return globalLockTemplate.execute(() -> {try {return methodInvocation.proceed();} catch (Exception e) {throw e;} catch (Throwable e) {throw new RuntimeException(e);}});}/*** 核心代理本质通过transactionalTemplate来实现* @param methodInvocation* @param globalTrxAnno* @return* @throws Throwable*/private Object handleGlobalTransaction(final MethodInvocation methodInvocation,final GlobalTransactional globalTrxAnno) throws Throwable {boolean succeed = true;try {return transactionalTemplate.execute(new TransactionalExecutor() {@Overridepublic Object execute() throws Throwable {//执行原方法return methodInvocation.proceed();}public String name() {String name = globalTrxAnno.name();if (!StringUtils.isNullOrEmpty(name)) {return name;}return formatMethod(methodInvocation.getMethod());}@Overridepublic TransactionInfo getTransactionInfo() {//根据注解中设定信息构建TransactionInfo transactionalTemplate中需要使用TransactionInfo transactionInfo = new TransactionInfo();transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());transactionInfo.setName(name());transactionInfo.setPropagation(globalTrxAnno.propagation());Set<RollbackRule> rollbackRules = new LinkedHashSet<>();for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));}for (String rbRule : globalTrxAnno.rollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));}for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));}for (String rbRule : globalTrxAnno.noRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));}transactionInfo.setRollbackRules(rollbackRules);return transactionInfo;}});} catch (TransactionalExecutor.ExecutionException e) {TransactionalExecutor.Code code = e.getCode();switch (code) {//在事务哪阶段发生了异常 根据不同异常分支走不同代码case RollbackDone:throw e.getOriginalException();case BeginFailure://第一阶段发生异常succeed = false;failureHandler.onBeginFailure(e.getTransaction(), e.getCause());throw e.getCause();case CommitFailure:succeed = false;//commit发生异常failureHandler.onCommitFailure(e.getTransaction(), e.getCause());throw e.getCause();case RollbackFailure://回滚时发生异常failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();case RollbackRetrying://回滚重试异常failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();default://其它异常throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));}} finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}}public <T extends Annotation> T getAnnotation(Method method, Class<?> targetClass, Class<T> annotationClass) {return Optional.ofNullable(method).map(m -> m.getAnnotation(annotationClass)).orElse(Optional.ofNullable(targetClass).map(t -> t.getAnnotation(annotationClass)).orElse(null));}//构建该方法唯一名称 避免方法重载使用入参private String formatMethod(Method method) {StringBuilder sb = new StringBuilder(method.getName()).append("(");//方法执行参数类型Class<?>[] params = method.getParameterTypes();int in = 0;for (Class<?> clazz : params) {sb.append(clazz.getName());if (++in < params.length) {sb.append(", ");}}return sb.append(")").toString();}/*** 监听ConfigurationChangeEvent事件 只针对于disable_global_transaction与client_degrade_check变更* @param event the event*/@Overridepublic void onChangeEvent(ConfigurationChangeEvent event) {if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {LOGGER.info("{} config changed, old value:{}, new value:{}", ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,disable, event.getNewValue());disable = Boolean.parseBoolean(event.getNewValue().trim());} else if (ConfigurationKeys.CLIENT_DEGRADE_CHECK.equals(event.getDataId())) {degradeCheck = Boolean.parseBoolean(event.getNewValue());if (!degradeCheck) {degradeNum = 0;}}}/*** auto upgrade service detection*/private static void startDegradeCheck() {executor.scheduleAtFixedRate(() -> {if (degradeCheck) {try {String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000);TransactionManagerHolder.get().commit(xid);EVENT_BUS.post(new DegradeCheckEvent(true));} catch (Exception e) {EVENT_BUS.post(new DegradeCheckEvent(false));}}}, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);}@Subscribepublic static void onDegradeCheck(DegradeCheckEvent event) {if (event.isRequestSuccess()) {if (degradeNum >= degradeCheckAllowTimes) {reachNum++;if (reachNum >= degradeCheckAllowTimes) {reachNum = 0;degradeNum = 0;if (LOGGER.isInfoEnabled()) {LOGGER.info("the current global transaction has been restored");}}} else if (degradeNum != 0) {degradeNum = 0;}} else {if (degradeNum < degradeCheckAllowTimes) {degradeNum++;if (degradeNum >= degradeCheckAllowTimes) {if (LOGGER.isWarnEnabled()) {LOGGER.warn("the current global transaction has been automatically downgraded");}}} else if (reachNum != 0) {reachNum = 0;}}}
}

从上述源码中invoke方法可知,本质通过TransactionalTemplate的execute来执行真正的流程,如下所示:

1.4:TransactionalTemplate(事务执行模版)

该类封装了事务执行的

public class TransactionalTemplate {private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);/*** Execute object.** @param business the business* @return the object* @throws TransactionalExecutor.ExecutionException the execution exception*/public Object execute(TransactionalExecutor business) throws Throwable {// 1 获取GlobalTransactionalInterceptor中根据注解封装的TransactionInfo类TransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1创建一个全局事务 默认为DefaultGlobalTransaction 感觉当前上下文中是否包含一个xidGlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();// 1.2 处理不同的事务传播级别和branchTypePropagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {//处理不同的事务传播级别case NOT_SUPPORTED:suspendedResourcesHolder = tx.suspend(true);return business.execute();case REQUIRES_NEW:suspendedResourcesHolder = tx.suspend(true);break;case SUPPORTS:if (!existingTransaction()) {//如果已经存在事务直接执行 不创建事务return business.execute();}break;case REQUIRED:break;case NEVER:if (existingTransaction()) {//如果已经存在事务throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s",RootContext.getXID()));} else {return business.execute();}case MANDATORY:if (!existingTransaction()) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}try {// 2. 开始 TransactionbeginTransaction(txInfo, tx);Object rs = null;try {// 执行业务代码rs = business.execute();} catch (Throwable ex) {//3 在业务代码执行若发生异常 判定抛出的异常是否需要被回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4.所有方法都以准备完成 commit事务commitTransaction(tx);//返回结果集return rs;} finally {//5. 清除triggerAfterCompletion();cleanUp();}} finally {tx.resume(suspendedResourcesHolder);}}public boolean existingTransaction() {return StringUtils.isNotEmpty(RootContext.getXID());}private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {//roll backif (txInfo != null && txInfo.rollbackOn(originalException)) {try {//需要进行回滚rollbackTransaction(tx, originalException);} catch (TransactionException txe) {//回滚失败 抛出RollbackFailure类型异常 由GlobalTransactionalInterceptor处理throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.RollbackFailure, originalException);}} else {//这个异常不需要进行 回滚 直接提交commitTransaction(tx);}}/*** 提交事务* @param tx* @throws TransactionalExecutor.ExecutionException*/private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {//执行commit的之前钩子函数triggerBeforeCommit();tx.commit();//执行commit的after钩子函数triggerAfterCommit();} catch (TransactionException txe) {// 4.1 事务提交失败throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.CommitFailure);}}/*** 回滚一个事务* @param tx* @param originalException* @throws TransactionException* @throws TransactionalExecutor.ExecutionException*/private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {//执行rollback的之前钩子函数triggerBeforeRollback();tx.rollback();//执行rollback的之后钩子函数triggerAfterRollback();// 3.1 Successfully rolled backthrow new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);}/*** 开始一个事务* @param txInfo* @param tx* @throws TransactionalExecutor.ExecutionException*/private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {//执行beginTransaction的before钩子函数triggerBeforeBegin();//底层是通过GlobalTransaction来执行tx.begin(txInfo.getTimeOut(), txInfo.getName());//执行beginTransaction的After钩子函数triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);}}private void triggerBeforeBegin() {for (TransactionHook hook : getCurrentHooks()) {try {hook.beforeBegin();} catch (Exception e) {LOGGER.error("Failed execute beforeBegin in hook {}",e.getMessage(),e);}}}private void triggerAfterBegin() {for (TransactionHook hook : getCurrentHooks()) {try {hook.afterBegin();} catch (Exception e) {LOGGER.error("Failed execute afterBegin in hook {}",e.getMessage(),e);}}}private void triggerBeforeRollback() {for (TransactionHook hook : getCurrentHooks()) {try {hook.beforeRollback();} catch (Exception e) {LOGGER.error("Failed execute beforeRollback in hook {}",e.getMessage(),e);}}}private void triggerAfterRollback() {for (TransactionHook hook : getCurrentHooks()) {try {hook.afterRollback();} catch (Exception e) {LOGGER.error("Failed execute afterRollback in hook {}",e.getMessage(),e);}}}private void triggerBeforeCommit() {for (TransactionHook hook : getCurrentHooks()) {try {hook.beforeCommit();} catch (Exception e) {LOGGER.error("Failed execute beforeCommit in hook {}",e.getMessage(),e);}}}private void triggerAfterCommit() {for (TransactionHook hook : getCurrentHooks()) {try {hook.afterCommit();} catch (Exception e) {LOGGER.error("Failed execute afterCommit in hook {}",e.getMessage(),e);}}}private void triggerAfterCompletion() {for (TransactionHook hook : getCurrentHooks()) {try {hook.afterCompletion();} catch (Exception e) {LOGGER.error("Failed execute afterCompletion in hook {}",e.getMessage(),e);}}}private void cleanUp() {TransactionHookManager.clear();}private List<TransactionHook> getCurrentHooks() {return TransactionHookManager.getHooks();}}

从上述源码可以看出对于全局事务的提交回滚都是通过GlobalTransaction接口来实现的

1.4:GlobalTransaction

该接口提供了事务有关的所有方法,具体的实现为DefaultGlobalTransaction

public interface GlobalTransaction {/*** 使用默认超时和名称开始新的全局事务**/void begin() throws TransactionException;/*** 使用给定的超时和默认名称开始新的全局事务。**/void begin(int timeout) throws TransactionException;/***使用给定的超时和给定的名称开始新的全局事务。**/void begin(int timeout, String name) throws TransactionException;/*** 提交全局事务。**/void commit() throws TransactionException;/*** 回滚全局事务。**/void rollback() throws TransactionException;/*** 暂停全局事务。**/SuspendedResourcesHolder suspend(boolean unbindXid) throws TransactionException;/*** 恢复全局事务。**/void resume(SuspendedResourcesHolder suspendedResourcesHolder) throws TransactionException;/***向TC询问相应全局事务的当前状态。**/GlobalStatus getStatus() throws TransactionException;/*** 获取 XID.** @return XID. xid*/String getXid();/**** 向tc报告全局事务状态**/void globalReport(GlobalStatus globalStatus) throws TransactionException;/*** 全局事务的本地状态。**/GlobalStatus getLocalStatus();
}

1.4:DefaultGlobalTransaction

public class DefaultGlobalTransaction implements GlobalTransaction {private static final Logger LOGGER = LoggerFactory.getLogger(DefaultGlobalTransaction.class);//默认全局事务执行的超时事件private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;private static final String DEFAULT_GLOBAL_TX_NAME = "default";private TransactionManager transactionManager;//全局xidprivate String xid;//全局状态private GlobalStatus status;//当前执行流程在全局事务中的角色 Launcher 或Participantprivate GlobalTransactionRole role;private static final int COMMIT_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_TM_COMMIT_RETRY_COUNT, DEFAULT_TM_COMMIT_RETRY_COUNT);private static final int ROLLBACK_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_TM_ROLLBACK_RETRY_COUNT, DEFAULT_TM_ROLLBACK_RETRY_COUNT);/*** 实例化新的默认全局事务。*/DefaultGlobalTransaction() {this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);}/*** 实例化新的默认全局事务。** @param xid    the xid* @param status the status* @param role   the role*/DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {this.transactionManager = TransactionManagerHolder.get();this.xid = xid;this.status = status;this.role = role;}@Overridepublic void begin() throws TransactionException {begin(DEFAULT_GLOBAL_TX_TIMEOUT);}@Overridepublic void begin(int timeout) throws TransactionException {begin(timeout, DEFAULT_GLOBAL_TX_NAME);}@Overridepublic void begin(int timeout, String name) throws TransactionException {//验证角色为Launcher 提交者if (role != GlobalTransactionRole.Launcher) {//作为一个参与者 上下文中肯定包含xid 若不包含抛出异常assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}//验证xid是否为null 不为 null说明当前事务状态不对assertXIDNull();if (RootContext.getXID() != null) {throw new IllegalStateException();}//获取一个新的xidxid = transactionManager.begin(null, null, name, timeout);//变更当前事务状态status = GlobalStatus.Begin;//将这个xid写入到全局事务上下文中RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", xid);}}@Overridepublic void commit() throws TransactionException {//如果是Participant参与者无法进行事务提交if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);}return;}assertXIDNotNull();//提交重试次数 默认5次int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {//一直重试知道达到最大次数或commit成功while (retry > 0) {try {//提交事务status = transactionManager.commit(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());//重试次数-1retry--;if (retry == 0) {throw new TransactionException("Failed to report global commit", ex);}}}} finally {if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {//解绑xidsuspend(true);}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] commit status: {}", xid, status);}}@Overridepublic void rollback() throws TransactionException {if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of rollbackif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);}return;}//验证xid是否有效assertXIDNotNull();//回滚重试次数 默认5次int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;try {while (retry > 0) {try {//获取回滚状态status = transactionManager.rollback(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());retry--;if (retry == 0) {throw new TransactionException("Failed to report global rollback", ex);}}}} finally {if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {//暂停任务 解绑xidsuspend(true);}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] rollback status: {}", xid, status);}}/*** 中断全局事务* @param unbindXid if true,suspend the global transaction.* @return* @throws TransactionException*/@Overridepublic SuspendedResourcesHolder suspend(boolean unbindXid) throws TransactionException {String xid = RootContext.getXID();if (StringUtils.isNotEmpty(xid) && unbindXid) {RootContext.unbind();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Suspending current transaction,xid = {}",xid);}} else {xid = null;}return new SuspendedResourcesHolder(xid);}/*** 继续全局事务* @param suspendedResourcesHolder the suspended resources to resume* @throws TransactionException*/@Overridepublic void resume(SuspendedResourcesHolder suspendedResourcesHolder) throws TransactionException {if (suspendedResourcesHolder == null) {return;}String xid = suspendedResourcesHolder.getXid();if (StringUtils.isNotEmpty(xid)) {RootContext.bind(xid);if (LOGGER.isDebugEnabled()) {LOGGER.debug("Resumimg the transaction,xid = {}", xid);}}}/*** 获取TC中携有的全局事务状态* @return* @throws TransactionException*/@Overridepublic GlobalStatus getStatus() throws TransactionException {if (xid == null) {return GlobalStatus.UnKnown;}status = transactionManager.getStatus(xid);return status;}/*** 获取全局xid* @return*/@Overridepublic String getXid() {return xid;}@Overridepublic void globalReport(GlobalStatus globalStatus) throws TransactionException {assertXIDNotNull();if (globalStatus == null) {throw new IllegalStateException();}//远程向tc报告当前事务状态status = transactionManager.globalReport(xid, globalStatus);if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] report status: {}", xid, status);}if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {suspend(true);}}/*** 获取本地持有的全局事务状态* @return*/@Overridepublic GlobalStatus getLocalStatus() {return status;}private void assertXIDNotNull() {if (xid == null) {throw new IllegalStateException();}}private void assertXIDNull() {if (xid != null) {throw new IllegalStateException();}}}

1.4:DefaultTransactionManager

用于管理TM与TC之间交互方法

public class DefaultTransactionManager implements TransactionManager {@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {//构建请求参数GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);//同步发送begin netty请求GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);//判断返回状态if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}//获取全局xidreturn response.getXid();}@Overridepublic GlobalStatus commit(String xid) throws TransactionException {//构建请求参数GlobalCommitRequest globalCommit = new GlobalCommitRequest();globalCommit.setXid(xid);//同步发送commit netty请求GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);//返回当前全局事务状态return response.getGlobalStatus();}@Overridepublic GlobalStatus rollback(String xid) throws TransactionException {//构建请求参数GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();globalRollback.setXid(xid);//同步发送rollback netty请求GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);//返回当前全局事务状态return response.getGlobalStatus();}@Overridepublic GlobalStatus getStatus(String xid) throws TransactionException {GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();queryGlobalStatus.setXid(xid);GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);return response.getGlobalStatus();}@Overridepublic GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {GlobalReportRequest globalReport = new GlobalReportRequest();globalReport.setXid(xid);globalReport.setGlobalStatus(globalStatus);//上报当前分支事务状态GlobalReportResponse response = (GlobalReportResponse) syncCall(globalReport);return response.getGlobalStatus();}private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);} catch (TimeoutException toe) {throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);}}
}

从上述代码可以看出通过sendSyncRequest()来发送netty请求,sendSyncRequest为TmNettyRemotingClient父类AbstractNettyRemotingClient方法。

1.5:AbstractNettyRemotingClient

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNettyRemotingClient.class);private static final String MSG_ID_PREFIX = "msgId:";private static final String FUTURES_PREFIX = "futures:";private static final String SINGLE_LOG_POSTFIX = ";";private static final int MAX_MERGE_SEND_MILLS = 1;private static final String THREAD_PREFIX_SPLIT_CHAR = "_";private static final int MAX_MERGE_SEND_THREAD = 1;private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L;private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";protected final Object mergeLock = new Object();/*** When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap.*/protected final Map<Integer, MergeMessage> mergeMsgMap = new ConcurrentHashMap<>();/*** When batch sending is enabled, the message will be stored to basketMap* Send via asynchronous thread {@link MergedSendRunnable}* {@link NettyClientConfig#isEnableClientBatchSendRequest}*/protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();private final NettyClientBootstrap clientBootstrap;private NettyClientChannelManager clientChannelManager;private final NettyPoolKey.TransactionRole transactionRole;private ExecutorService mergeSendExecutorService;private TransactionMessageHandler transactionMessageHandler;@Overridepublic void init() {//定义周期延时任务默认10s 该任务用于TM与TC的channel的连接检测 对于断的Channel进行重连timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否配置开启transport.enableClientBatchSendRequest即客户端事务消息请求是否批量合并发送 默认为true//批量发送的原理使用 等待(汇集数据)唤醒(发送数据)机制 + CompletableFuture获取异步发送回调方法if (NettyClientConfig.isEnableClientBatchSendRequest()) {//定义线程pool 默认1个线程mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));//运行MergedSendRunnable任务即合并发送请求mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//netty client 启动clientBootstrap.start();}public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {super(messageExecutor);this.transactionRole = transactionRole;clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);//设置netty handler 接受netty 结果返回结果集并写入到 MessageFuture的rs中clientBootstrap.setChannelHandlers(new ClientHandler());clientChannelManager = new NettyClientChannelManager(new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);}@Overridepublic Object sendSyncRequest(Object msg) throws TimeoutException {//一般TC肯定为集群部署 通过负载均衡算法获取对应服务器地址String serverAddress = loadBalance(getTransactionServiceGroup());//获取请求超时时间 默认30sint timeoutMillis = NettyClientConfig.getRpcRequestTimeout();//构建请求信息RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);// send batch message// put message into basketMap, @see MergedSendRunnable//是否开启批量发送 默认开启 如果开启将信息缓存到basketMap 然后再统一发送// 在等待批量发送阶段时 这些线程对应任务都在阻塞等待 知道发送成功后才被唤醒if (NettyClientConfig.isEnableClientBatchSendRequest()) {// send batch message is sync request, needs to create messageFuture and put it in futures.//发送批处理消息是同步请求,需要创建messageFuture(本质通过CompletableFuture实现)并将其放入futures缓存中。MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);// 数据写入到basketMap 缓存中ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;//获取请求TC地址对应的任务队列BlockingQueue<RpcMessage> basket = map.get(serverAddress);//为每一个TC地址创建一个发送任务队列if (basket == null) {//数据写入map.putIfAbsent(serverAddress, new LinkedBlockingQueue<>());basket = map.get(serverAddress);}//写入队列basket.offer(rpcMessage);if (LOGGER.isDebugEnabled()) {LOGGER.debug("offer message: {}", rpcMessage.getBody());}//判断是否是否可以发送if (!isSending) {//唤醒mergeLock 锁下阻塞等待数据的MergedSendRunnable(一次会将堆积的数据全部请求发送掉 堆积的数据大小为上次MergedSendRunnable发送时间)synchronized (mergeLock) {mergeLock.notifyAll();}}try {//在超时时间内阻塞等待结果return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}",exx.getMessage(), serverAddress, rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}} else {//获取连接通道Netty ChannelChannel channel = clientChannelManager.acquireChannel(serverAddress);//直接将数据发送return super.sendSync(channel, rpcMessage, timeoutMillis);}}@Overridepublic Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {if (channel == null) {LOGGER.warn("sendSyncRequest nothing, caused by null channel.");return null;}RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);return super.sendSync(channel, rpcMessage, NettyClientConfig.getRpcRequestTimeout());}@Overridepublic void sendAsyncRequest(Channel channel, Object msg) {if (channel == null) {LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");return;}//构建请求 RpcMessageRpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST: ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);if (rpcMessage.getBody() instanceof MergeMessage) {//记录merge后消息缓存体mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());}super.sendAsync(channel, rpcMessage);}@Overridepublic void sendAsyncResponse(String serverAddress, RpcMessage rpcMessage, Object msg) {RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, ProtocolConstants.MSGTYPE_RESPONSE);Channel channel = clientChannelManager.acquireChannel(serverAddress);super.sendAsync(channel, rpcMsg);}/*** 为每个messagetype 注册processor数据解析执行其器* @param requestCode* @param processor   {@link RemotingProcessor}* @param executor    thread pool*/@Overridepublic void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);this.processorTable.put(requestCode, pair);}@Overridepublic void destroyChannel(String serverAddress, Channel channel) {clientChannelManager.destroyChannel(serverAddress, channel);}@Overridepublic void destroy() {clientBootstrap.shutdown();if (mergeSendExecutorService != null) {mergeSendExecutorService.shutdown();}super.destroy();}public void setTransactionMessageHandler(TransactionMessageHandler transactionMessageHandler) {this.transactionMessageHandler = transactionMessageHandler;}public TransactionMessageHandler getTransactionMessageHandler() {return transactionMessageHandler;}public NettyClientChannelManager getClientChannelManager() {return clientChannelManager;}/*** 根据注册中心获取tx对应实际地址  并根据负载均衡算法从集群地址中选取任意一个地址* @param transactionServiceGroup tx-service-group 事务分组* @return*/private String loadBalance(String transactionServiceGroup) {InetSocketAddress address = null;try {@SuppressWarnings("unchecked")//根据使用不同的注册中心获取tx对应实际地址()List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);//负载均衡算法(支持两种线性roundRobin与随机)从集群地址中选取任意一个地址address = LoadBalanceFactory.getInstance().select(inetSocketAddressList);} catch (Exception ex) {LOGGER.error(ex.getMessage());}if (address == null) {throw new FrameworkException(NoAvailableService);}//转化为标准http://ip+port地址return NetUtil.toStringAddress(address);}private String getThreadPrefix() {return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();}/*** Get pool key function.** @return lambda function*/protected abstract Function<String, NettyPoolKey> getPoolKeyFunction();/*** Get transaction service group.** @return transaction service group*/protected abstract String getTransactionServiceGroup();/*** 合并数据发送任务*/private class MergedSendRunnable implements Runnable {@Overridepublic void run() {//死循环while (true) {//synchronized (mergeLock) {try {//阻塞等待1smergeLock.wait(MAX_MERGE_SEND_MILLS);} catch (InterruptedException e) {}}isSending = true;//循环发送缓存basketMap中存储数据for (String address : basketMap.keySet()) {BlockingQueue<RpcMessage> basket = basketMap.get(address);if (basket.isEmpty()) {continue;}//message 包装类 封装批量的RpcMessageMergedWarpMessage mergeMessage = new MergedWarpMessage();while (!basket.isEmpty()) {RpcMessage msg = basket.poll();mergeMessage.msgs.add((AbstractMessage) msg.getBody());mergeMessage.msgIds.add(msg.getId());}if (mergeMessage.msgIds.size() > 1) {//打印批量日志printMergeMessageLog(mergeMessage);}Channel sendChannel = null;try {// send batch message is sync request, but there is no need to get the return value.// Since the messageFuture has been created before the message is placed in basketMap,// the return value will be obtained in ClientOnResponseProcessor.//获取send channelsendChannel = clientChannelManager.acquireChannel(address);//发送批量同步请求 这里不需要获取返回值 而是通过messageFuture中的ClientOnResponseProcessor来返回AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);} catch (FrameworkException e) {//发送失败处理if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {destroyChannel(address, sendChannel);}// fast fail//发送merge消息失败 将之前futures缓存的messageId与future关系异常并将future回调结果置为nullfor (Integer msgId : mergeMessage.msgIds) {MessageFuture messageFuture = futures.remove(msgId);if (messageFuture != null) {messageFuture.setResultMessage(null);}}LOGGER.error("client merge call failed: {}", e.getMessage(), e);}}isSending = false;}}private void printMergeMessageLog(MergedWarpMessage mergeMessage) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("merge msg size:{}", mergeMessage.msgIds.size());for (AbstractMessage cm : mergeMessage.msgs) {LOGGER.debug(cm.toString());}StringBuilder sb = new StringBuilder();for (long l : mergeMessage.msgIds) {sb.append(MSG_ID_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);}sb.append("\n");for (long l : futures.keySet()) {sb.append(FUTURES_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);}LOGGER.debug(sb.toString());}}}/*** The type ClientHandler.*/@Sharableclass ClientHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}//处理TC返回的message数据processMessage(ctx, (RpcMessage) msg);}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) {synchronized (lock) {if (ctx.channel().isWritable()) {lock.notifyAll();}}ctx.fireChannelWritabilityChanged();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {if (messageExecutor.isShutdown()) {return;}if (LOGGER.isInfoEnabled()) {LOGGER.info("channel inactive: {}", ctx.channel());}clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress()));super.channelInactive(ctx);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state() == IdleState.READER_IDLE) {if (LOGGER.isInfoEnabled()) {LOGGER.info("channel {} read idle.", ctx.channel());}try {String serverAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());clientChannelManager.invalidateObject(serverAddress, ctx.channel());} catch (Exception exx) {LOGGER.error(exx.getMessage());} finally {clientChannelManager.releaseChannel(ctx.channel(), getAddressFromContext(ctx));}}if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {try {if (LOGGER.isDebugEnabled()) {LOGGER.debug("will send ping msg,channel {}", ctx.channel());}AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);} catch (Throwable throwable) {LOGGER.error("send request error: {}", throwable.getMessage(), throwable);}}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(),NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage(), cause);clientChannelManager.releaseChannel(ctx.channel(), getAddressFromChannel(ctx.channel()));if (LOGGER.isInfoEnabled()) {LOGGER.info("remove exception rm channel:{}", ctx.channel());}super.exceptionCaught(ctx, cause);}@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {if (LOGGER.isInfoEnabled()) {LOGGER.info(ctx + " will closed");}super.close(ctx, future);}}
}
//单条记录发送
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {if (timeoutMillis <= 0) {throw new FrameworkException("timeout should more than 0ms");}if (channel == null) {LOGGER.warn("sendSync nothing, caused by null channel.");return null;}MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);//写入缓存futuresfutures.put(rpcMessage.getId(), messageFuture);channelWritableCheck(channel, rpcMessage.getBody());//message数据写入netty channel 异步发送 注册回调listenerchannel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {//异步回调失败处理if (!future.isSuccess()) {//从futures结果集中移除MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());if (messageFuture1 != null) {//记录失败原因messageFuture1.setResultMessage(future.cause());}//销毁关闭 channeldestroyChannel(future.channel());}});try {//等待获取回调传回的return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}}

从上文源码可以看到根据EnableClientBatchSendRequest配置,由client决定是否将数据合并发送,如果该设置会加大seata TM的整体吞吐量,但会损失响应时长。关于这个配置可感觉实际业务场景设定,如果是大量的分布式事务场景,可以设置为true(默认也为true),若是少量可以设置为false,加快响应时间。不论单条还是merge消息聚合发送,本质是通过netty异步发送message,注册对于的listener回调监听,注册ChannelHandlers(ClientHandler继承ChannelInboundHandlerAdapter,在client接受response时执行),ClientHandler中获取response的body,通过body消息类型获取初始化绑定消息解析器(默认为ClientOnResponseProcessor,对于解析器的绑定动作在TmNettyRemotingClient或RmNettyRemotingClient的初始化init中执行)。而在ClientOnResponseProcessor中处理消息后将数据写入到MessageFuture的setResult中,这个MessageFuture中包含CompletableFuture类(对于MessageFuture提供的get或者setResult本质都是对CompletableFuture进行操作,MessageFuture算是对CompletableFuture的包装类吧)。在发送线程发送message后,线程通过MessageFuture提供的get的进行阻塞等待异步回调结果,只有当ClientOnResponseProcessor中对于的message有消息到达即setResult被执行,发送线程才能获取最终值返回值。对于此处的方式就是一个netty异步发送的具体实现。对于此处功能详细的描述,可自行参考源码。

class NettyClientChannelManager {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientChannelManager.class);//每一个channel对应的lock 保证每一个TC server对只会有一个channel 被创建private final ConcurrentMap<String, Object> channelLocks = new ConcurrentHashMap<>();//TC-server地址(ip+port)为key  与NettyPoolKey(reg信息)关系private final ConcurrentMap<String, NettyPoolKey> poolKeyMap = new ConcurrentHashMap<>();//缓存所有channel TC-server地址(ip+port)为key channel为value RM 或TM client与TC集群的任意节点只会缓存一个channelprivate final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();//管理neety clinet key(reg信息)与channel 关系private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;//Tm 或RM中getPoolKeyFunction函数(reg信息) 映射 serverAddress 与NettyPoolKey关系private Function<String, NettyPoolKey> poolKeyFunction;NettyClientChannelManager(final NettyPoolableFactory keyPoolableFactory, final Function<String, NettyPoolKey> poolKeyFunction,final NettyClientConfig clientConfig) {//初始化 chnnel连接缓存pool 使用apache中连接poolnettyClientKeyPool = new GenericKeyedObjectPool<>(keyPoolableFactory);nettyClientKeyPool.setConfig(getNettyPoolConfig(clientConfig));this.poolKeyFunction = poolKeyFunction;}/*** 本地配置转化为GenericKeyedObjectPool 中连接pool设置* @param clientConfig* @return*/private GenericKeyedObjectPool.Config getNettyPoolConfig(final NettyClientConfig clientConfig) {//设置chnnel poolGenericKeyedObjectPool.Config poolConfig = new GenericKeyedObjectPool.Config();//最大存活数poolConfig.maxActive = clientConfig.getMaxPoolActive();//最小空闲数poolConfig.minIdle = clientConfig.getMinPoolIdle();//最大等待连接时间poolConfig.maxWait = clientConfig.getMaxAcquireConnMills();//测试poolConfig.testOnBorrow = clientConfig.isPoolTestBorrow();poolConfig.testOnReturn = clientConfig.isPoolTestReturn();poolConfig.lifo = clientConfig.isPoolLifo();return poolConfig;}/*** 获取在当前Rpc客户端上注册的所有通道** @return channels*/ConcurrentMap<String, Channel> getChannels() {return channels;}/*** 获与TC的netty客户端channel** @param serverAddress server address* @return netty channel*/Channel acquireChannel(String serverAddress) {Channel channelToServer = channels.get(serverAddress);//已经存在if (channelToServer != null) {//获取alive的channelchannelToServer = getExistAliveChannel(channelToServer, serverAddress);if (channelToServer != null) {return channelToServer;}}if (LOGGER.isInfoEnabled()) {LOGGER.info("will connect to " + serverAddress);}/*不存在channel 需要创建一个新的*///创建一个chanel对应lockchannelLocks.putIfAbsent(serverAddress, new Object());//加锁保证只会创建一个channelsynchronized (channelLocks.get(serverAddress)) {//创建一个channelreturn doConnect(serverAddress);}}/*** Release channel to pool if necessary.* 从pool中释放channel* @param channel channel* @param serverAddress server address*/void releaseChannel(Channel channel, String serverAddress) {if (channel == null || serverAddress == null) { return; }try {//加锁synchronized (channelLocks.get(serverAddress)) {Channel ch = channels.get(serverAddress);if (ch == null) {nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);return;}if (ch.compareTo(channel) == 0) {if (LOGGER.isInfoEnabled()) {LOGGER.info("return to pool, rm channel:{}", channel);}destroyChannel(serverAddress, channel);} else {nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);}}} catch (Exception exx) {LOGGER.error(exx.getMessage());}}/*** 销毁 channel.** @param serverAddress server address* @param channel channel*/void destroyChannel(String serverAddress, Channel channel) {if (channel == null) { return; }try {//从缓存中移除if (channel.equals(channels.get(serverAddress))) {channels.remove(serverAddress);}nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);} catch (Exception exx) {LOGGER.error("return channel to rmPool error:{}", exx.getMessage());}}/*** Reconnect to remote server of current transaction service group.* 从新连接远程远程服务* @param transactionServiceGroup transaction service group*/void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//从注册中心获取有效服务地址availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}if (CollectionUtils.isEmpty(availList)) {//error 日志打印String serviceGroup = RegistryFactory.getInstance().getServiceGroup(transactionServiceGroup);LOGGER.error("no available service '{}' found, please make sure registry config correct", serviceGroup);return;}for (String serverAddress : availList) {try {//重新构建channelacquireChannel(serverAddress);} catch (Exception e) {LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);}}}/*** 从pool中作废channel* @param serverAddress tc地址* @param channel* @throws Exception*/void invalidateObject(final String serverAddress, final Channel channel) throws Exception {nettyClientKeyPool.invalidateObject(poolKeyMap.get(serverAddress), channel);}/*** 注册一个channel到本地缓存* @param serverAddress tc地址* @param channel*/void registerChannel(final String serverAddress, final Channel channel) {//判断channel有效写入缓存if (channels.get(serverAddress) != null && channels.get(serverAddress).isActive()) {return;}channels.put(serverAddress, channel);}/*** 创建一个新的channel* @param serverAddress* @return*/private Channel doConnect(String serverAddress) {//再次校验Channel channelToServer = channels.get(serverAddress);if (channelToServer != null && channelToServer.isActive()) {return channelToServer;}Channel channelFromPool;try {//获取reg NettyPoolKeyNettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);//写入缓存NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);//校验是否为RM reg request请求 若是需要写入resourceIdsif (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());}//写入pool中channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));channels.put(serverAddress, channelFromPool);} catch (Exception exx) {LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);throw new FrameworkException("can not register RM,err:" + exx.getMessage());}return channelFromPool;}private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {//根据服务名称从注册中心中获取有效的服务信息列表List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);if (CollectionUtils.isEmpty(availInetSocketAddressList)) {return Collections.emptyList();}//转化数据格式return availInetSocketAddressList.stream().map(NetUtil::toStringAddress).collect(Collectors.toList());}private Channel getExistAliveChannel(Channel rmChannel, String serverAddress) {if (rmChannel.isActive()) {return rmChannel;} else {int i = 0;//重新校验channel 是否alive(默认300次-共3s)for (; i < NettyClientConfig.getMaxCheckAliveRetry(); i++) {try {//等待10msThread.sleep(NettyClientConfig.getCheckAliveInternal());} catch (InterruptedException exx) {LOGGER.error(exx.getMessage());}//重新校验rmChannel = channels.get(serverAddress);if (rmChannel != null && rmChannel.isActive()) {return rmChannel;}}//警告 移除无效channelif (i == NettyClientConfig.getMaxCheckAliveRetry()) {LOGGER.warn("channel {} is not active after long wait, close it.", rmChannel);releaseChannel(rmChannel, serverAddress);return null;}}return null;}
}

管理与TC之间的channel的NettyClientChannelManager类

NettyPoolableFactory
public class NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> {private static final Logger LOGGER = LoggerFactory.getLogger(NettyPoolableFactory.class);private final AbstractNettyRemotingClient rpcRemotingClient;private final NettyClientBootstrap clientBootstrap;/*** Instantiates a new Netty key poolable factory.** @param rpcRemotingClient the rpc remoting client*/public NettyPoolableFactory(AbstractNettyRemotingClient rpcRemotingClient, NettyClientBootstrap clientBootstrap) {this.rpcRemotingClient = rpcRemotingClient;this.clientBootstrap = clientBootstrap;}//构建一个新的channel@Overridepublic Channel makeObject(NettyPoolKey key) {InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());if (LOGGER.isInfoEnabled()) {LOGGER.info("NettyPool create channel to " + key);}Channel tmpChannel = clientBootstrap.getNewChannel(address);long start = System.currentTimeMillis();Object response;Channel channelToServer = null;if (key.getMessage() == null) {throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());}try {response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());if (!isRegisterSuccess(response, key.getTransactionRole())) {rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());} else {channelToServer = tmpChannel;rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());}} catch (Exception exx) {if (tmpChannel != null) {tmpChannel.close();}throw new FrameworkException("register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:"+ channelToServer);}return channelToServer;}//判断是否channel 是否reg成功private boolean isRegisterSuccess(Object response, NettyPoolKey.TransactionRole transactionRole) {if (response == null) {return false;}if (transactionRole.equals(NettyPoolKey.TransactionRole.TMROLE)) {if (!(response instanceof RegisterTMResponse)) {return false;}RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;return registerTMResponse.isIdentified();} else if (transactionRole.equals(NettyPoolKey.TransactionRole.RMROLE)) {if (!(response instanceof RegisterRMResponse)) {return false;}RegisterRMResponse registerRMResponse = (RegisterRMResponse)response;return registerRMResponse.isIdentified();}return false;}private String getVersion(Object response, NettyPoolKey.TransactionRole transactionRole) {if (transactionRole.equals(NettyPoolKey.TransactionRole.TMROLE)) {return ((RegisterTMResponse) response).getVersion();} else {return ((RegisterRMResponse) response).getVersion();}}@Overridepublic void destroyObject(NettyPoolKey key, Channel channel) throws Exception {if (channel != null) {if (LOGGER.isInfoEnabled()) {LOGGER.info("will destroy channel:" + channel);}channel.disconnect();channel.close();}}@Overridepublic boolean validateObject(NettyPoolKey key, Channel obj) {if (obj != null && obj.isActive()) {return true;}if (LOGGER.isInfoEnabled()) {LOGGER.info("channel valid false,channel:" + obj);}return false;}@Overridepublic void activateObject(NettyPoolKey key, Channel obj) throws Exception {}@Overridepublic void passivateObject(NettyPoolKey key, Channel obj) throws Exception {}
}

上述描述对channel管理,包含创建,重连,移除等动作

2.0:异常处理

在GlobalTransactionalInterceptor中描述handleGlobalTransaction()的正常流程,当transactionalTemplate.execute()发生了异常情况,根据同步的异常类型,seata有着不同的处理方式。处理异常类由FailureHandler接口体现,如下所示:

public interface FailureHandler {/*** On begin failure.**/void onBeginFailure(GlobalTransaction tx, Throwable cause);/*** On commit failure.**/void onCommitFailure(GlobalTransaction tx, Throwable cause);/*** On rollback failure.**/void onRollbackFailure(GlobalTransaction tx, Throwable originalException);/*** On rollback retrying**/void onRollbackRetrying(GlobalTransaction tx, Throwable originalException);
}

从上述源码可以看出,FailureHandler中封装了TM与TC交互中基本所有异常的异常处理流程,它的默认实现DefaultFailureHandlerImpl,在GlobalTransactionalInterceptor初始化时被指定,如下所示:

public class DefaultFailureHandlerImpl implements FailureHandler {private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFailureHandlerImpl.class);/*** 重试最大时间默认1个小时 每次重试间隔时间10s 360次 共一个小时*/private static final int RETRY_MAX_TIMES = 6 * 60;//计划间隔秒数 默认10sprivate static final long SCHEDULE_INTERVAL_SECONDS = 10;private static final long TICK_DURATION = 1;private static final int TICKS_PER_WHEEL = 8;//timer 定时时间轮用于定时检测TC中对于事务状态 private HashedWheelTimer timer = new HashedWheelTimer(new NamedThreadFactory("failedTransactionRetry", 1),TICK_DURATION, TimeUnit.SECONDS, TICKS_PER_WHEEL);@Overridepublic void onBeginFailure(GlobalTransaction tx, Throwable cause) {//在begin阶段 无任何业务逻辑执行 无需重试LOGGER.warn("Failed to begin transaction. ", cause);}@Overridepublic void onCommitFailure(GlobalTransaction tx, Throwable cause) {//在全局提交阶段 该阶段发生任何异常 不断测试TC中全局事务状态LOGGER.warn("Failed to commit transaction[" + tx.getXid() + "]", cause);timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Committed), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);}@Overridepublic void onRollbackFailure(GlobalTransaction tx, Throwable originalException) {//在全局回滚阶段 该阶段发生任何异常 不断测试TC中全局事务状态LOGGER.warn("Failed to rollback transaction[" + tx.getXid() + "]", originalException);//定时器每隔10s进行Rollbacked状态检测timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Rollbacked), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);}@Overridepublic void onRollbackRetrying(GlobalTransaction tx, Throwable originalException) {StackTraceLogger.warn(LOGGER, originalException, "Retrying to rollback transaction[{}]", new String[] {tx.getXid()});timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.RollbackRetrying), SCHEDULE_INTERVAL_SECONDS,TimeUnit.SECONDS);}/*** 异常重试*/protected class CheckTimerTask implements TimerTask {private final GlobalTransaction tx;//确认状态private final GlobalStatus required;//记录重试次数private int count = 0;//重试标识 直到重试成功private boolean isStopped = false;protected CheckTimerTask(final GlobalTransaction tx, GlobalStatus required) {this.tx = tx;this.required = required;}@Overridepublic void run(Timeout timeout) throws Exception {if (!isStopped) {//if (++count > RETRY_MAX_TIMES) {//超过次数重新再来LOGGER.error("transaction [{}] retry fetch status times exceed the limit [{} times]", tx.getXid(), RETRY_MAX_TIMES);return;}//通过查询当前事务在TC中状态isStopped = shouldStop(tx, required);//不断重试timer.newTimeout(this, SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);}}}private boolean shouldStop(final GlobalTransaction tx, GlobalStatus required) {try {//获取tc中该事务当前状态GlobalStatus status = tx.getStatus();LOGGER.info("transaction [{}] current status is [{}]", tx.getXid(), status);//当前全局事务状态为确认或终态时才能结束if (status == required || status == GlobalStatus.Finished) {return true;}} catch (TransactionException e) {LOGGER.error("fetch GlobalTransaction status error", e);}return false;}}

2:seata客户端-RM(基于springcloud项目分析)

上述描述了TM大大致使用流程,在GlobalTransactionScanner初始化时一起被初始化。这里感觉有些服务可能不需要TC而只作为一个分支RM使用,所以这里个人感觉没必要两个都进行初始化,可以根据使用者的选择进行。

2.1:客户端RM client

rm与TM类似,在1.2与1.3版本中对于管理channel类有着不同的命名

public class RMClient {/*** 初始化** @param applicationId           the application id* @param transactionServiceGroup the transaction service group*/public static void init(String applicationId, String transactionServiceGroup) {//单列 获取RmNettyRemotingClientRmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);//设置DefaultResourceManager 单例模式 该类使用策略模式 携有BranchType类型对应ResouceManagerrmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());//设置DefaultRMHandler 单例模式 该类使用策略模式 携有BranchType类型对应RMHandler// 针对不同类型client的分布式事务实现具体使用不同策略rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());//初始化rmNettyRemotingClient.init();}}

RmNettyRemotingClient与TmNettyRemotingClient都继承AbstractNettyRemotingClient了,所以RmNettyRemotingClient的初始化过程与TmNettyRemotingClient基本差不多,所以这里也不再叙述。

2.1:DefaultRMHandler

携有BranchType类型对应RMHandler 针对不同类型client的分布式事务实现具体使用不同策略

public class DefaultRMHandler extends AbstractRMHandler {//记录BranchType(AT XA TCC SAGA) 与其对应4种RMHandlerprotected static Map<BranchType, AbstractRMHandler> allRMHandlersMap= new ConcurrentHashMap<BranchType, AbstractRMHandler>();protected DefaultRMHandler() {initRMHandlers();}protected void initRMHandlers() {List<AbstractRMHandler> allRMHandlers = EnhancedServiceLoader.loadAll(AbstractRMHandler.class);if (CollectionUtils.isNotEmpty(allRMHandlers)) {for (AbstractRMHandler rmHandler : allRMHandlers) {allRMHandlersMap.put(rmHandler.getBranchType(), rmHandler);}}}//针对 commit rollbakc undologdel 3种流程的handler方法@Overridepublic BranchCommitResponse handle(BranchCommitRequest request) {return getRMHandler(request.getBranchType()).handle(request);}@Overridepublic BranchRollbackResponse handle(BranchRollbackRequest request) {return getRMHandler(request.getBranchType()).handle(request);}@Overridepublic void handle(UndoLogDeleteRequest request) {getRMHandler(request.getBranchType()).handle(request);}protected AbstractRMHandler getRMHandler(BranchType branchType) {return allRMHandlersMap.get(branchType);}@Overrideprotected ResourceManager getResourceManager() {throw new FrameworkException("DefaultRMHandler isn't a real AbstractRMHandler");}private static class SingletonHolder {private static AbstractRMHandler INSTANCE = new DefaultRMHandler();}/***单例获取DefaultRMHandler** @return the resource manager*/public static AbstractRMHandler get() {return DefaultRMHandler.SingletonHolder.INSTANCE;}@Overridepublic BranchType getBranchType() {throw new FrameworkException("DefaultRMHandler isn't a real AbstractRMHandler");}
}

父抽象类 AbstractRMHandler封装具体执行流程,并调用底层ResourceManage执行RM数据层逻辑

public abstract class AbstractRMHandler extends AbstractExceptionHandlerimplements RMInboundHandler, TransactionMessageHandler {private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRMHandler.class);//模版方法模式@Overridepublic BranchCommitResponse handle(BranchCommitRequest request) {BranchCommitResponse response = new BranchCommitResponse();//执行异常处理统一模版exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {@Overridepublic void execute(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {doBranchCommit(request, response);}}, request, response);return response;}@Overridepublic BranchRollbackResponse handle(BranchRollbackRequest request) {BranchRollbackResponse response = new BranchRollbackResponse();exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {@Overridepublic void execute(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {doBranchRollback(request, response);}}, request, response);return response;}/*** delete undo log 针对于AT下模式* @param request the request*/@Overridepublic void handle(UndoLogDeleteRequest request) {// https://github.com/seata/seata/issues/2226}/*** Do branch commit.** @param request  the request* @param response the response* @throws TransactionException the transaction exception*/protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);}//根据不同ResourceManager 提交底层流程//获取该分支事务执行状态 后续上报TCBranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch commit result: " + status);}}/*** Do branch rollback.** @param request  the request* @param response the response* @throws TransactionException the transaction exception*/protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);}//获取该分支事务执行状态 后续上报TCBranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacked result: " + status);}}/*** get resource manager implement * 对应4种分布式事务branch 模式 AT XA TCC SAGA ** @return*/protected abstract ResourceManager getResourceManager();@Overridepublic AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {if (!(request instanceof AbstractTransactionRequestToRM)) {throw new IllegalArgumentException();}//AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;transactionRequest.setRMInboundMessageHandler(this);return transactionRequest.handle(context);}@Overridepublic void onResponse(AbstractResultMessage response, RpcContext context) {LOGGER.info("the rm client received response msg [{}] from tc server.", response.toString());}public abstract BranchType getBranchType();
}

2.2:DefaultResourceManager(ResourceManager)

该类的设计策略与DefaultRMHandler一样,也是使用策略设计模式,内部包含了4种(AT,XA,TCC,SAGA)分布式事务分支对应的具体ResourceManager,由handler中调用,具体执行数据层的分支事务执行。

public class DefaultResourceManager implements ResourceManager {/*** all resource managers*/protected static Map<BranchType, ResourceManager> resourceManagers= new ConcurrentHashMap<>();private DefaultResourceManager() {initResourceManagers();}/*** Get resource manager.** @return the resource manager*/public static DefaultResourceManager get() {return SingletonHolder.INSTANCE;}/*** only for mock** @param branchType* @param rm*/public static void mockResourceManager(BranchType branchType, ResourceManager rm) {resourceManagers.put(branchType, rm);}protected void initResourceManagers() {//初始化所有ResourceManager 并写入缓存中List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);if (CollectionUtils.isNotEmpty(allResourceManagers)) {for (ResourceManager rm : allResourceManagers) {resourceManagers.put(rm.getBranchType(), rm);}}}//分支 resource commit 本质就是删除undo日志@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId,String resourceId, String applicationData)throws TransactionException {return getResourceManager(branchType).branchCommit(branchType, xid, branchId, resourceId, applicationData);}@Overridepublic BranchStatus branchRollback(BranchType branchType, String xid, long branchId,String resourceId, String applicationData)throws TransactionException {return getResourceManager(branchType).branchRollback(branchType, xid, branchId, resourceId, applicationData);}@Overridepublic Long branchRegister(BranchType branchType, String resourceId,String clientId, String xid, String applicationData, String lockKeys)throws TransactionException {return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData,lockKeys);}@Overridepublic void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status,String applicationData) throws TransactionException {getResourceManager(branchType).branchReport(branchType, xid, branchId, status, applicationData);}@Overridepublic boolean lockQuery(BranchType branchType, String resourceId,String xid, String lockKeys) throws TransactionException {return getResourceManager(branchType).lockQuery(branchType, resourceId, xid, lockKeys);}@Overridepublic void registerResource(Resource resource) {getResourceManager(resource.getBranchType()).registerResource(resource);}@Overridepublic void unregisterResource(Resource resource) {getResourceManager(resource.getBranchType()).unregisterResource(resource);}@Overridepublic Map<String, Resource> getManagedResources() {Map<String, Resource> allResource = new HashMap<>();for (ResourceManager rm : resourceManagers.values()) {Map<String, Resource> tempResources = rm.getManagedResources();if (tempResources != null) {allResource.putAll(tempResources);}}return allResource;}/*** get ResourceManager by Resource Type** @param branchType* @return*/public ResourceManager getResourceManager(BranchType branchType) {ResourceManager rm = resourceManagers.get(branchType);if (rm == null) {throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());}return rm;}@Overridepublic BranchType getBranchType() {throw new FrameworkException("DefaultResourceManager isn't a real ResourceManager");}private static class SingletonHolder {private static DefaultResourceManager INSTANCE = new DefaultResourceManager();}}

这里以AT模式为主 介绍其DatasourceManager(它底层通过AsyncWorker进行异步事务执行)

public class DataSourceManager extends AbstractResourceManager implements Initialize {private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceManager.class);private ResourceManagerInbound asyncWorker;private Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();/*** Sets async worker.** @param asyncWorker the async worker*/public void setAsyncWorker(ResourceManagerInbound asyncWorker) {this.asyncWorker = asyncWorker;}/*** 锁查询 在拥有GlobalLock判断当前是否存在全局锁*/@Overridepublic boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys)throws TransactionException {try {//封装请求GlobalLockQueryRequest request = new GlobalLockQueryRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);GlobalLockQueryResponse response = null;if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {//请求TC 或者结果response = (GlobalLockQueryResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);} else {throw new RuntimeException("unknow situation!");}if (response.getResultCode() == ResultCode.Failed) {throw new TransactionException(response.getTransactionExceptionCode(),"Response[" + response.getMsg() + "]");}//是否被锁定return response.isLockable();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.LockableCheckFailed, "Runtime", rex);}}@Deprecated@SuppressWarnings("unchecked")private String loadBalance() {InetSocketAddress address = null;try {List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().lookup(TmNettyRemotingClient.getInstance().getTransactionServiceGroup());address = LoadBalanceFactory.getInstance().select(inetSocketAddressList);} catch (Exception ignore) {LOGGER.error(ignore.getMessage());}if (address == null) {throw new FrameworkException(NoAvailableService);}return NetUtil.toStringAddress(address);}/*** Init.** @param asyncWorker the async worker*/public synchronized void initAsyncWorker(ResourceManagerInbound asyncWorker) {setAsyncWorker(asyncWorker);}/*** Instantiates a new Data source manager.*/public DataSourceManager() {}@Overridepublic void init() {//创建AsyncWorker并初始化AsyncWorker asyncWorker = new AsyncWorker();asyncWorker.init();initAsyncWorker(asyncWorker);}//注册Resource代理对象 缓存在本地@Overridepublic void registerResource(Resource resource) {DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;//ResourceId 与DataSourceProxy映射关系dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);super.registerResource(dataSourceProxy);}@Overridepublic void unregisterResource(Resource resource) {throw new NotSupportYetException("unregister a resource");}/***获取datasource代理对象** @param resourceId the resource id* @return the data source proxy*/public DataSourceProxy get(String resourceId) {return (DataSourceProxy) dataSourceCache.get(resourceId);}@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);}//二阶段 分支回滚@Overridepublic BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {//获取resource下对应DataSource代理对象DataSourceProxy dataSourceProxy = get(resourceId);if (dataSourceProxy == null) {throw new ShouldNeverHappenException();}try {//一般undo日志存在需要改变的事务数据源下 执行其undo数据UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);} catch (TransactionException te) {StackTraceLogger.info(LOGGER, te,"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;} else {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}}//返回二阶段回滚return BranchStatus.PhaseTwo_Rollbacked;}@Overridepublic Map<String, Resource> getManagedResources() {return dataSourceCache;}@Overridepublic BranchType getBranchType() {return BranchType.AT;}}
AsyncWorker异步执行分支事务(主要执行RM端的二次提交-删除undo日志):
public class AsyncWorker implements ResourceManagerInbound {private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWorker.class);//默认private static final int DEFAULT_RESOURCE_SIZE = 16;//避免在高并发下导致存在大量commit请求 一次删除过于庞大 所以定义一个循环下最大的undolog删除数量private static final int UNDOLOG_DELETE_LIMIT_SIZE = 1000;/*** 2阶段Context 包含commit所需要数据*/private static class Phase2Context {/*** 实例化一个新的2阶段Context** @param branchType      the branchType* @param xid             the xid* @param branchId        the branch id* @param resourceId      the resource id* @param applicationData the application data*/public Phase2Context(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) {this.xid = xid;this.branchId = branchId;this.resourceId = resourceId;this.applicationData = applicationData;this.branchType = branchType;}/*** The Xid.*/String xid;/*** The Branch id.*/long branchId;/*** The Resource id.*/String resourceId;/*** The Application data.*/String applicationData;/*** the branch Type*/BranchType branchType;}//异步提交 buffer数 默认10000private static int ASYNC_COMMIT_BUFFER_LIMIT = ConfigurationFactory.getInstance().getInt(CLIENT_ASYNC_COMMIT_BUFFER_LIMIT, DEFAULT_CLIENT_ASYNC_COMMIT_BUFFER_LIMIT);//异步提交阻塞队列存储需要提交Phase2Contextprivate static final BlockingQueue<Phase2Context> ASYNC_COMMIT_BUFFER = new LinkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT);//分支提交@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {//新建Phase2Context存储队列if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);}//返回中间状态return BranchStatus.PhaseTwo_Committed;}/*** Init. 在DataSourceManager中被初始化*/public synchronized void init() {LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);//定义调度器ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));//定时调度 没1s调度一次timerExecutor.scheduleAtFixedRate(() -> {try {//执行分支批量提交doBranchCommits();} catch (Throwable e) {LOGGER.info("Failed at async committing ... {}", e.getMessage());}}, 10, 1000 * 1, TimeUnit.MILLISECONDS);}private void doBranchCommits() {if (ASYNC_COMMIT_BUFFER.isEmpty()) {return;}//映射上下文 缓存resourceId 下多次commit请求Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);//从buffer队列中获取 上文提交的Phase2Context数据 直到空队列while (!ASYNC_COMMIT_BUFFER.isEmpty()) {Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();//存在多数据源 多resourceIdList<Phase2Context> contextsGroupedByResourceId = mappedContexts.computeIfAbsent(commitContext.resourceId, k -> new ArrayList<>());contextsGroupedByResourceId.add(commitContext);}//单独处理每一个resource 数据源for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {Connection conn = null;DataSourceProxy dataSourceProxy;try {try {//获取resource对应 dataSourceProxy 数据源DataSource代理对象DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get().getResourceManager(BranchType.AT);dataSourceProxy = resourceManager.get(entry.getKey());if (dataSourceProxy == null) {throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());}//获取数据库连接对象conn = dataSourceProxy.getPlainConnection();} catch (SQLException sqle) {LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);continue;}//获取需要执行的commit数据List<Phase2Context> contextsGroupedByResourceId = entry.getValue();//封装 xids 与branchIdsSet<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);for (Phase2Context commitContext : contextsGroupedByResourceId) {xids.add(commitContext.xid);branchIds.add(commitContext.branchId);int maxSize = Math.max(xids.size(), branchIds.size());//避免一次性批量删除过多数据  所以这里每过1000条执行一次该数据源下UndoLog清理if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {try {UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);} catch (Exception ex) {LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);}xids.clear();branchIds.clear();}}if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {return;}try {//说明目前数据存量未达到1000条标准 直接删除undologUndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,branchIds, conn);} catch (Exception ex) {LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);}if (!conn.getAutoCommit()) {conn.commit();}} catch (Throwable e) {LOGGER.error(e.getMessage(), e);try {//执行失败回滚conn.rollback();} catch (SQLException rollbackEx) {LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);}} finally {if (conn != null) {try {conn.close();} catch (SQLException closeEx) {LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);}}}}}@Overridepublic BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {throw new NotSupportYetException();}
}

这里可以看出如果在执行undo log批量删除的时候发送错误,导致数据回滚,从而导致undo日志无法删除。这样存储数据堆积风险。seta是通过TC定时发送undo log删除命令给RM做到这些数据的清除,详细参考RmUndoLogProcessor

2.3:处理TC传来的Message 的Processor

根据消息类型 (类型可详细参考常量类MessageType )的不同选择使用不同Processor与执行线程,对于这些Processor的注册与TM的描述一致。RmNettyRemotingClient中init中执行。初始化过程如下所示:

 private void registerProcessor() {// 1.registry rm client handle branch commit processorRmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);// 2.registry rm client handle branch rollback processorRmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);// 3.registry rm handler undo log processorRmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);// 4.registry TC response processorClientOnResponseProcessor onResponseProcessor =new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);// 5.registry heartbeat message processorClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}

2.3.1:处理commit的RmBranchCommitProcessor

处理分支commit的processor,实际就是接受TM发起的二阶段commit,本质就是删除undo日志

public class RmBranchCommitProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchCommitProcessor.class);private TransactionMessageHandler handler;private RemotingClient remotingClient;public RmBranchCommitProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) {this.handler = handler;this.remotingClient = remotingClient;}@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//获取TC远程地址String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());//获取返回数据Object msg = rpcMessage.getBody();if (LOGGER.isInfoEnabled()) {LOGGER.info("rm client handle branch commit process:" + msg);}//执行分支commithandleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);}private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {BranchCommitResponse resultMessage;//通过本地branch事务manage执行本地事务resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);if (LOGGER.isDebugEnabled()) {LOGGER.debug("branch commit result:" + resultMessage);}try {//异步向TC汇报本地分支事务执行结果this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);} catch (Throwable throwable) {LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);}}
}

具体handler执行由上文 DefaultRMHandler执行。拿到最终分支执行事务执行结果上报给TC,由TC决定整体事务流程。

2.3.2:处理rollback的RmBranchRollbackProcessor

处理分支rollback的processor,实际就是接受TM发起的二阶段rollback,本质就是执行undo日志,达到回滚目的

public class RmBranchRollbackProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchRollbackProcessor.class);private TransactionMessageHandler handler;private RemotingClient remotingClient;public RmBranchRollbackProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) {this.handler = handler;this.remotingClient = remotingClient;}@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//获取TC地址String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());Object msg = rpcMessage.getBody();if (LOGGER.isInfoEnabled()) {LOGGER.info("rm handle branch rollback process:" + msg);}handleBranchRollback(rpcMessage, remoteAddress, (BranchRollbackRequest) msg);}private void handleBranchRollback(RpcMessage request, String serverAddress, BranchRollbackRequest branchRollbackRequest) {BranchRollbackResponse resultMessage;//本地执行undo 回滚 底层调用链路 handler-》resourceManagerresultMessage = (BranchRollbackResponse) handler.onRequest(branchRollbackRequest, null);if (LOGGER.isDebugEnabled()) {LOGGER.debug("branch rollback result:" + resultMessage);}try {//发送TC分支执行回滚结果this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);} catch (Throwable throwable) {LOGGER.error("send response error: {}", throwable.getMessage(), throwable);}}
}

2.3.3:处理undoLog的RmUndoLogProcessor

处理TC发起的undo log删除命令

/*** 处理TC undo log delete命令* {@link UndoLogDeleteRequest}**/
public class RmUndoLogProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(RmUndoLogProcessor.class);private TransactionMessageHandler handler;public RmUndoLogProcessor(TransactionMessageHandler handler) {this.handler = handler;}@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//获取tc发送的UndoLogDeleteRequest数据Object msg = rpcMessage.getBody();if (LOGGER.isInfoEnabled()) {LOGGER.info("rm handle undo log process:" + msg);}//执行undo log删除命令handleUndoLogDelete((UndoLogDeleteRequest) msg);}private void handleUndoLogDelete(UndoLogDeleteRequest undoLogDeleteRequest) {try {//底层调用链路 直接在RMHandlerAT中执行handler.onRequest(undoLogDeleteRequest, null);} catch (Exception e) {LOGGER.error("Failed to delete undo log by undoLogDeleteRequest on" + undoLogDeleteRequest.getResourceId());}}
}
RMHandlerAT:处理AT模式下的handler
public class RMHandlerAT extends AbstractRMHandler {private static final Logger LOGGER = LoggerFactory.getLogger(RMHandlerAT.class);private static final int LIMIT_ROWS = 3000;/*** 处理UndoLogDeleteRequest 请求* @param request the request*/@Overridepublic void handle(UndoLogDeleteRequest request) {//获取需要处理的DataSource 代理对象DataSourceManager dataSourceManager = (DataSourceManager)getResourceManager();DataSourceProxy dataSourceProxy = dataSourceManager.get(request.getResourceId());if (dataSourceProxy == null) {LOGGER.warn("Failed to get dataSourceProxy for delete undolog on {}", request.getResourceId());return;}//获取当前时间对应前SaveDays天数(默认7天)Date logCreatedSave = getLogCreated(request.getSaveDays());Connection conn = null;try {conn = dataSourceProxy.getPlainConnection();//记录被删除rowsint deleteRows = 0;do {try {//删除undo表crate时间小于指定天数前3000条数据deleteRows = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).deleteUndoLogByLogCreated(logCreatedSave, LIMIT_ROWS, conn);if (deleteRows > 0 && !conn.getAutoCommit()) {//手动commitconn.commit();}} catch (SQLException exx) {if (deleteRows > 0 && !conn.getAutoCommit()) {conn.rollback();}throw exx;}//每次删除3000 直到数据被删除干净} while (deleteRows == LIMIT_ROWS);} catch (Exception e) {LOGGER.error("Failed to delete expired undo_log, error:{}", e.getMessage(), e);} finally {if (conn != null) {try {conn.close();} catch (SQLException closeEx) {LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);}}}}//获取删除undo条件 时间private Date getLogCreated(int saveDays) {if (saveDays <= 0) {saveDays = UndoLogDeleteRequest.DEFAULT_SAVE_DAYS;}Calendar calendar = Calendar.getInstance();calendar.add(Calendar.DATE, -saveDays);return calendar.getTime();}/*** get AT resource managerDataSourceManager.java** @return*/@Overrideprotected ResourceManager getResourceManager() {return DefaultResourceManager.get().getResourceManager(BranchType.AT);}@Overridepublic BranchType getBranchType() {return BranchType.AT;}}

这样就可以保证RM中undo表的体积不会因为异步删除的原因导致体量变大

2.3.4:处理RM的response消息的ClientOnResponseProcessor

与TM中的ClientOnResponseProcessor功能一致

public class ClientOnResponseProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(ClientOnResponseProcessor.class);/*** 缓存message Id 与merge消息之间映射关系 由AbstractNettyRemotingClient存储*/private Map<Integer, MergeMessage> mergeMsgMap;/*** 缓存每一条message Id(如果是merge中是其中每一条消息) 与MessageFuture映射关系 由AbstractNettyRemoting存储*/private ConcurrentMap<Integer, MessageFuture> futures;/*** To handle the received RPC message on upper level.**/private TransactionMessageHandler transactionMessageHandler;public ClientOnResponseProcessor(Map<Integer, MergeMessage> mergeMsgMap,ConcurrentHashMap<Integer, MessageFuture> futures,TransactionMessageHandler transactionMessageHandler) {this.mergeMsgMap = mergeMsgMap;this.futures = futures;this.transactionMessageHandler = transactionMessageHandler;}@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//判断是否是聚合发送消息if (rpcMessage.getBody() instanceof MergeResultMessage) {//获取结果MergeResultMessage results = (MergeResultMessage) rpcMessage.getBody();//移除缓存MergeResultMessage集合MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(rpcMessage.getId());for (int i = 0; i < mergeMessage.msgs.size(); i++) {//处理每一条数据 结果写入future中int msgId = mergeMessage.msgIds.get(i);MessageFuture future = futures.remove(msgId);if (future == null) {if (LOGGER.isInfoEnabled()) {LOGGER.info("msg: {} is not found in futures.", msgId);}} else {//写回结果 发起请求方阻塞等待结果future.setResultMessage(results.getMsgs()[i]);}}} else {//非聚合单条消息MessageFuture messageFuture = futures.remove(rpcMessage.getId());if (messageFuture != null) {messageFuture.setResultMessage(rpcMessage.getBody());} else {if (rpcMessage.getBody() instanceof AbstractResultMessage) {if (transactionMessageHandler != null) {transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);}}}}}
}

2.3.5:处理RM的heartbeat消息的ClientHeartbeatProcessor

处理TC心跳检测的processor

public class ClientHeartbeatProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(ClientHeartbeatProcessor.class);@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//接受TC PONG请求if (rpcMessage.getBody() == HeartbeatMessage.PONG) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("received PONG from {}", ctx.channel().remoteAddress());}}}
}

AT模式下整体执行流程图

阿里开源一站式分布式事务框架seata源码分析(AT模式下TM与RM分析)相关推荐

  1. seata xid是什么_阿里开源的分布式事务框架 Seata

    1. Seata 概述 Seata 是 Simple Extensible Autonomous Transaction Architecture 的简写,由 feascar 改名而来. Seata ...

  2. 关于分布式事务: 阿里开源的分布式事务框架 Seata 和 LCN的分析

    之前使用过LCN分布式事务, 最近看到面试者简历中另一种方案 Seata, 通过它来在实战中解决分布式事务的问题.故 去简单了解了一下Seata是什么, 和LCN的区别在哪里, 如果是你 你怎么选择解 ...

  3. 蚂蚁金服分布式事务框架DTX源码学习

    文章目录 一.前言 二.DTX简介 三.角色 四.服务发起者与参与者DTX客户端启动流程 1.项目启动,创建dtx动态代理 2.初始化DtxClient客户端的init()方法 五.服务发起以及参与流 ...

  4. redis watchdog_Redis分布式事务框架Redisson源码解析(一)

    代码片段一. public static void main(String[] args) throws Exception { Config config = new Config(); confi ...

  5. 老板现在喊我大哥,原因是我用阿里分布式事务框架Seata解决了长久以来困扰公司的分布式事务问题

    大家好,我是曹尼玛 从大学毕业5年,一直努力学习,努力工作,追求新技术,不保守. 上个月我来到一家新公司上班,月薪20K,这家公司老板人很好,对员工很关爱,公司氛围不错,同事们也努力把公司项目搞搞好. ...

  6. 阿里分布式事务框架Seata集成详情

    大家好,我是曹尼玛 从大学毕业5年,一直努力学习,努力工作,追求新技术,不保守. 上个月我来到一家新公司上班,月薪20K,这家公司老板人很好,对员工很关爱,公司氛围不错,同事们也努力把公司项目搞搞好. ...

  7. 阿里分布式事务框架Seata

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务.Seata 将为用户提供了 AT.TCC.SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案. 中文 ...

  8. 阿里分布式事务框架Seata原理解析

    阿里分布式事务框架Seata原理解析 作者:伊凡的一天 链接:https://www.jianshu.com/p/044e95223a17 Seata框架是一个业务层的XA(两阶段提交)解决方案.在理 ...

  9. 分布式事务框架Seata

    分布式事务框架Seata     sei达 一. 分布式事务前言 1. 数据库管理系统中事务(transaction)的四个特性:简称ACID(这种特性简称刚性事物) 原子性(Atomicity):原 ...

最新文章

  1. Apache ServiceComb — Service Center
  2. 强化学习(十七) 基于模型的强化学习与Dyna算法框架
  3. mysql error report,ECSHOP网店系统提示MYSQL SERVER ERROR REPORT的解决方法
  4. esp启动是什么感觉_第九章 ESP32上电后的启动过程
  5. matlab2c使用c++实现matlab函数系列教程-rank函数
  6. 【距离GDOI:136天】 后缀数组中...
  7. yum 安装 sz与rz(上传、下载)
  8. uniapp中针对H5端做微信分享功能总结
  9. Firefox,IE5,IE6,IE5.5等浏览器兼容性解决方法
  10. android备忘录的开发总结报告,android备忘录
  11. 蓝牙耳机哪款性价比高?双11高颜值蓝牙耳机推荐测评
  12. 计算机各领域的伟人,了解一下
  13. 【设计模式】Builder模式
  14. 牛客AI模拟面试1测开岗
  15. CAP理论举例及说明
  16. 在机器人面前,人类怎样做才能不悲观
  17. 64位win7共享打印机提示0x000006cc的解决方法
  18. 业精于勤,荒于嬉;行成于思,毁于随(博客园)
  19. Generator 函数的详解:
  20. 【附源码】计算机毕业设计SSM校园后台报修管理系统

热门文章

  1. secureCRT永久注册码和资源
  2. Android复习笔记(12) -handler的使用
  3. mysql 的 infobright 数据库的 mediumblob 显示不了数据
  4. cs224w(图机器学习)2021冬季课程学习笔记11 Theory of Graph Neural Networks
  5. windows计算机cmd,CMD是什么?Windows系统CMD命令大全
  6. cascadia-code字体
  7. Ubuntu扩展根目录空间
  8. JavaScript:反引号用法
  9. web自动化之验证码识别解决方案
  10. 在X11图形环境下开启/关闭勿扰模式及其背后机制