tcc-transation源码分析与思考
tcc介绍
tcc是分布式事务的一种解决方案,即Try,Commit,Cancel
Try: 尝试执行业务
完成所有业务检查(一致性)预留必须业务资源(准隔离性)
复制代码
Confirm: 确认执行业务
真正执行业务不作任何业务检查只使用Try阶段预留的业务资源Confirm操作满足幂等性
复制代码
Cancel: 取消执行业务
释放Try阶段预留的业务资源Cancel操作满足幂等性
复制代码
本文我会讲解一个实现tcc思想的框架,tcc-transaction
在github还是比较火的,并且应该也有公司生产使用,熟悉它的源码,一方面可以提升自己眼界,扩宽自己编码能力,另一方面,以后需要使用它的时候,难免有坑需要修改,也能为开源贡献一份力量
tcc-transaction使用
在tcc-transaction的tcc-transaction-tutorial-sample模块,做相关配置运行即可
tcc-transaction原理
这边我们主要讲解tcc-transaction和dubbo的整合 tcc-transaction的主要原理是Aop,那么以后面试的时候,问用Aop做什么,就可以回答这个了,再把tcc讲一下,完美的连招 作为一个处理分布式事务的框架,先来讲下tcc-transaction的事务抽象
Transaction
tcc的事务,并不是数据库的事务,而是应用层的事务,所以tcc这三个阶段的操作,全部都需要我们手动实现 先看下Transaction对象的参数
private TransactionXid xid;private TransactionStatus status;private TransactionType transactionType;private volatile int retriedCount = 0;private Date createTime = new Date();private Date lastUpdateTime = new Date();private long version = 1;private List<Participant> participants = new ArrayList<Participant>();private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
复制代码
参数 | 解释 |
---|---|
xid | 全局事务id,内容相当于uuid,用来保证事务唯一性 |
status | 事务的状态,可以为TRYING,CONFIRMING,CANCELLING,分别对应tcc三个阶段 |
transactionType | 事务类型,可以为ROOT,BRANCH,ROOT是主事务,BRANCH是分支事务,事务的发起方法对应主事务,其他被调用的dubbo接口在分支事务上 |
retriedCount | 事务重试次数,当confirm或者cancel失败重试时增加 |
createTime | 事务的创建时间 |
lastUpdateTime | 事务最后一次更新时间 |
version | 事务的版本号,乐观锁? |
participants | 事务的参与者 |
attachments | 附加参数,暂时没发现被用到 |
再看下Transaction中两个比较重要的方法
public void commit() {for (Participant participant : participants) {participant.commit();}
}public void rollback() {for (Participant participant : participants) {participant.rollback();}
}
复制代码
执行Transaction的commit或rollback方法,会对应执行所有participant的commit或rollback方法
下面讲解Participant抽象
Participant
Participant是对tcc事务参与者的抽象
public class Participant implements Serializable {private static final long serialVersionUID = 4127729421281425247L;private TransactionXid xid;private InvocationContext confirmInvocationContext;private InvocationContext cancelInvocationContext;private Terminator terminator = new Terminator();Class<? extends TransactionContextEditor> transactionContextEditorClass;......
}
复制代码
在Participant中使用InvocationContext把事务参与者的confirm和cancel方法的元数据保存下来
public class InvocationContext implements Serializable {private static final long serialVersionUID = -7969140711432461165L;private Class targetClass;private String methodName;private Class[] parameterTypes;private Object[] args;
}
复制代码
元数据包括目标Class,目标方法名,目标参数列表,实际参数列表
Participant也保存了transactionContextEditorClass,用于提取事务上下文
接下来看Participant的commit和rollback方法
public void rollback() {terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);}public void commit() {terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);}
复制代码
会通过terminator执行具体的操作,传入构造好的TransactionContext,confirmInvocationContext和transactionContextEditorClass
public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {try {//从spring容器中获取对应事务参与者实现Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();Method method = null;//获取对应方法反射对象method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());//设置上下文FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());//反射调用return method.invoke(target, invocationContext.getArgs());} catch (Exception e) {throw new SystemException(e);}}return null;}
复制代码
在terminator的调用过程中,我们需要理解的一点的是,Participant其实分两种,第一种是本地的Participant,直接反射调用即可,第二种是远程的Participant,也就是调用的是Dubbo接口,反射调用的同时会执行远程对等端的接口逻辑,所以这里需要用到transactionContextEditorClass来设置上下文信息,传递到远程dubbo接口中去
TransactionContext
TransactionContext的保存了全局事务id和事务状态,在调用事务参与者Participant的confirm或cancel方法时会传递过去
TransactionContextEditor
public interface TransactionContextEditor {public TransactionContext get(Object target, Method method, Object[] args);public void set(TransactionContext transactionContext, Object target, Method method, Object[] args);}
复制代码
TransactionContextEditor用于调用事务参与者方法时,设置和获取需要传递的TransactionContext,目前有2种实现,DefaultTransactionContextEditor和DubboTransactionContextEditor DefaultTransactionContextEditor从方法参数里面提取TransactionContext对象
@Overridepublic TransactionContext get(Object target, Method method, Object[] args) {//获取TransactionContext在args中的位置int position = getTransactionContextParamPosition(method.getParameterTypes());if (position >= 0) {return (TransactionContext) args[position];}return null;}
复制代码
DubboTransactionContextEditor从Dubbo Rpc上下文提取TransactionContext对象
@Overridepublic TransactionContext get(Object target, Method method, Object[] args) {//从Dubbo Rpc上下文获取String context = RpcContext.getContext().getAttachment(TransactionContextConstants.TRANSACTION_CONTEXT);if (StringUtils.isNotEmpty(context)) {return JSON.parseObject(context, TransactionContext.class);}return null;}
复制代码
TransactionManager
TransactionManager用来控制Transaction的生命周期,Transaction的改变使用TransactionRepository同步到第三方存储,一般使用mysql数据库存储 TransactionManager包含的方法以及属性如下
变量介绍
- transactionRepository
private TransactionRepository transactionRepository;
复制代码
TransactionRepository用于对Transaction的持久化操作,如果是JDBC实现,其实就是对一张Transaction表的CRUD,这张表主要用于补偿任务 2. CURRENT
private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();
复制代码
这是一个双向队列,在这个类主要用作栈,用来处理事务的嵌套,因为是ThreadLocal,所以针对每个线程都是独立的 3. executorService
private ExecutorService executorService;
复制代码
线程池,用于异步执行commit或者cancel
方法介绍
- registerTransaction
private void registerTransaction(Transaction transaction) {if (CURRENT.get() == null) {CURRENT.set(new LinkedList<Transaction>());}CURRENT.get().push(transaction);
}
复制代码
把transaction设置到ThreadLocal对象中去,push方法对应入栈 2. begin public Transaction begin() {
Transaction transaction = new Transaction(TransactionType.ROOT);
transactionRepository.create(transaction);
registerTransaction(transaction);
return transaction;
复制代码
} 开启事务,同步到repository,注册到ThreadLocal,这个方法与用于主事务的创建
- propagationNewBegin
public Transaction propagationNewBegin(TransactionContext transactionContext) {Transaction transaction = new Transaction(transactionContext);transactionRepository.create(transaction);registerTransaction(transaction);return transaction;
}
复制代码
这个方法用于从主事务的上下文创建分支事务,xid保持不变,事务类型变化
- propagationExistBegin
public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());if (transaction != null) {transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));registerTransaction(transaction);return transaction;} else {throw new NoExistedTransactionException();}
}
复制代码
这个方法用于从事务上下文同步事务状态到ThreadLocal
- commit
public void commit(boolean asyncCommit) {//从threadlocal得到当前事务final Transaction transaction = getCurrentTransaction();transaction.changeStatus(TransactionStatus.CONFIRMING);//数据库更新transactiontransactionRepository.update(transaction);if (asyncCommit) {try {Long statTime = System.currentTimeMillis();//通过线程池异步执行事务提交executorService.submit(new Runnable() {@Overridepublic void run() {commitTransaction(transaction);}});logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));} catch (Throwable commitException) {logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);throw new ConfirmingException(commitException);}} else {//同步执行事务提交commitTransaction(transaction);}
}
复制代码
这个方法执行事务的commit,实际事务提交在commitTransaction中执行,会执行每个事务参与者的commit方法
private void commitTransaction(Transaction transaction) {try {//调用事务参与者的提交方法transaction.commit();//事务结束,在数据库删除当前事务,如果commit异常,不会把数据库内事务记录删除,为了重试补偿transactionRepository.delete(transaction);} catch (Throwable commitException) {logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);throw new ConfirmingException(commitException);}
}
复制代码
看了这段逻辑后,我们可以发现,在commit失败的时候,并不会触发rollback,而是不删除数据库事务记录,之后会有定时任务进行扫描重试,后面会讲到这个定时任务
- rollback
public void rollback(boolean asyncRollback) {final Transaction transaction = getCurrentTransaction();transaction.changeStatus(TransactionStatus.CANCELLING);transactionRepository.update(transaction);if (asyncRollback) {try {executorService.submit(new Runnable() {@Overridepublic void run() {rollbackTransaction(transaction);}});} catch (Throwable rollbackException) {logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);throw new CancellingException(rollbackException);}} else {rollbackTransaction(transaction);}
}
复制代码
这个方法用于执行事务的回滚逻辑,和commit方法类似,在rollbackTransaction方法中,会执行每个事务参与者的rollback方法
private void rollbackTransaction(Transaction transaction) {try {//调用事务参与者的提交方法transaction.rollback();//事务结束,在数据库删除当前事务,如果rollback异常,不会把数据库内事务记录删除,为了重试补偿transactionRepository.delete(transaction);} catch (Throwable rollbackException) {logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);throw new CancellingException(rollbackException);}
}
复制代码
- cleanAfterCompletion
public void cleanAfterCompletion(Transaction transaction) {if (isTransactionActive() && transaction != null) {Transaction currentTransaction = getCurrentTransaction();if (currentTransaction == transaction) {//栈操作,后进先出CURRENT.get().pop();} else {throw new SystemException("Illegal transaction when clean after completion");}}
}
复制代码
事务结束,从栈中弹出结束的事务。
- enlistParticipant
public void enlistParticipant(Participant participant) {Transaction transaction = this.getCurrentTransaction();transaction.enlistParticipant(participant);transactionRepository.update(transaction);}
复制代码
给事务绑定事务参与者并同步到repository
接下来讲下核心的两个切面,这两个切面把上面的所有组件串联在一起使用
核心Aspect
在使用tcc-transaction的时候,我们需要对开启tcc事务的方法加上@Compensable注解,这个注解可以设置以下参数
参数 | 解释 |
---|---|
propagation | 事务传播性,包含REQUIRED(必须存在事务,不存在,创建),SUPPORTS(有事务的话在事务内运行),MANDATORY(必须存在事务),REQUIRES_NEW(不管是否存在,创建新的事务) |
confirmMethod | confirm阶段对应的方法 |
cancelMethod | cancel阶段对应的方法 |
transactionContextEditor | 设置对应transactionContextEditor |
asyncConfirm | 是否异步confirm |
asyncCancel | 是否异步cancel |
@Compensable注解的参数会在下面两个切面中使用到
ConfigurableTransactionAspect
ConfigurableTransactionAspect主要用来控制Transaction的生命周期,内部通过CompensableTransactionInterceptor实现
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableService() {}@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}
复制代码
直接看interceptCompensableMethod方法
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {//解析@Compensable注解Method method = CompensableMethodUtils.getCompensableMethod(pjp);Compensable compensable = method.getAnnotation(Compensable.class);Propagation propagation = compensable.propagation();//获取上下文,如果是Root,不会存在上下文,Transaction都还没创建TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());boolean asyncConfirm = compensable.asyncConfirm();boolean asyncCancel = compensable.asyncCancel();boolean isTransactionActive = transactionManager.isTransactionActive();if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());}//计算方法类型,Root对应主事务入口方法,Provider对应远程提供者方的方法,Normal是主事务内消费者方的方法(是代理方法)MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);switch (methodType) {case ROOT://处理主事务切面return rootMethodProceed(pjp, asyncConfirm, asyncCancel);case PROVIDER://处理提供者事务切面return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);default://消费者事务直接执行,会对应执行远端提供者事务切面return pjp.proceed();}
}
复制代码
在tcc事务内被@Compensable注解的方法分三种
- Root方法,就是这次事务的入口方法
- Normal方法,在Root方法调用的dubbo接口方法
- Provider方法,对应dubbo接口方法的远程实现 被注解的方法都是try的逻辑,confirm和cancel逻辑配置在@Compensable注解参数中
对被@Compensable注解的方法执行切面逻辑的时候,会根据这三种方法类型做不同处理 对于Root方法,执行rootMethodProceed的逻辑
private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {Object returnValue = null;Transaction transaction = null;try {//创建事务transaction = transactionManager.begin();try {returnValue = pjp.proceed();} catch (Throwable tryingException) {if (isDelayCancelException(tryingException)) {transactionManager.syncTransaction();} else {logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);//回滚事务transactionManager.rollback(asyncCancel);}throw tryingException;}//提交事务transactionManager.commit(asyncConfirm);} finally {//清理操作transactionManager.cleanAfterCompletion(transaction);}return returnValue;
}
复制代码
会在被切方法(对应try逻辑)执行前,开启事务,try逻辑执行成功,通过transactionManager的commit方法执行每个事务参与者的commit逻辑,如果try失败,通过transactionManager执行每个参与者的rollback逻辑。
对于Provider方法
private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {Transaction transaction = null;try {switch (TransactionStatus.valueOf(transactionContext.getStatus())) {case TRYING://使用transactionContext创建分支事务transaction = transactionManager.propagationNewBegin(transactionContext);//执行被切方法逻辑return pjp.proceed();case CONFIRMING:try {//更新事务状态transaction = transactionManager.propagationExistBegin(transactionContext);//提交事务,不走切面的方法transactionManager.commit(asyncConfirm);} catch (NoExistedTransactionException excepton) {//the transaction has been commit,ignore it.}break;case CANCELLING:try {//更新事务状态transaction = transactionManager.propagationExistBegin(transactionContext);//回滚事务,不走切面的方法transactionManager.rollback(asyncCancel);} catch (NoExistedTransactionException exception) {//the transaction has been rollback,ignore it.}break;}} finally {//清理资源transactionManager.cleanAfterCompletion(transaction);}Method method = ((MethodSignature) (pjp.getSignature())).getMethod();//对于 confirm和 cancel 返回空值//主要针对原始类型做处理,因为不能为nullreturn ReflectionUtils.getNullValue(method.getReturnType());
}
复制代码
可以看到在provider类型方法的切面,也就是远程的Participant,如果transaction的status为trying,会通过transactionManager.propagationNewBegin创建分支事务并执行被切方法逻辑,如果是status为confirming或者canceling,会调用对应confirm或cancel配置的方法,跳过被切方法
对于normal类型 直接调用,normal类型的方法是封装了对远程dubbo接口方法调用逻辑的本地代理方法,所以直接执行即可
ConfigurableCoordinatorAspect
ConfigurableCoordinatorAspect主要是为了设置事务的参与者,在一个事务内,每个被@Compensable注解的方法都是事务参与者
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void transactionContextCall() {}@Around("transactionContextCall()")
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
}
复制代码
相关逻辑封装在ResourceCoordinatorInterceptor的interceptTransactionContextMethod方法中
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {//得到当前事务Transaction transaction = transactionManager.getCurrentTransaction();if (transaction != null) {switch (transaction.getStatus()) {case TRYING://只需要在trying的时候把参与者信息提取出来,设置到transaction中去enlistParticipant(pjp);break;case CONFIRMING:break;case CANCELLING:break;}}//执行目标方法return pjp.proceed(pjp.getArgs());
}
复制代码
在trying阶段会把所有参与者加入到事务中去,对于Root方法,创建主事务,加入的参与者会包括Root方法对应本地参与者以及Normal方法对应的远程参与者,对于Provider方法,通过主事务上下文创建分支事务,加入的参与者包括Provider方法对应的本地参与者以及它包含的Normal方法对应的远程参与者。这里的远程参与者又可以开启新的分支事务。层级多了,势必会产生性能问题。
接下来看enlistParticipant如何生成参与者对象
private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {//获取@Compensable信息Method method = CompensableMethodUtils.getCompensableMethod(pjp);if (method == null) {throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));}Compensable compensable = method.getAnnotation(Compensable.class);String confirmMethodName = compensable.confirmMethod();String cancelMethodName = compensable.cancelMethod();Transaction transaction = transactionManager.getCurrentTransaction();TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {//设置事务上下文到Editor,Editor用来统一提取上下文,这边对应设置dubbo的rpc上下文中去//这边的上下文设置后就会调用try逻辑FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());}Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());//目前的用法,其实只要保存调用参数就行,因为具体执行confirm和cancel都是根据transaction的status来判断的//confirm的调用上下文InvocationContext confirmInvocation = new InvocationContext(targetClass,confirmMethodName,method.getParameterTypes(), pjp.getArgs());//cancel的调用上下文InvocationContext cancelInvocation = new InvocationContext(targetClass,cancelMethodName,method.getParameterTypes(), pjp.getArgs());Participant participant =new Participant(xid,confirmInvocation,cancelInvocation,compensable.transactionContextEditor());//把participant设置到transaction,并且同步到持久化存储transactionManager.enlistParticipant(participant);}
复制代码
通过从@Compensable注解配置的信息以及当前Transaction来配置Participant。 在Participant设置到Transaction后,会执行pjp.proceed(pjp.getArgs()),也就执行了对应try逻辑的被切方法
ConfigurableCoordinatorAspect的逻辑会在ConfigurableTransactionAspect后执行,这和它们设置的order有关,小的order先执行,后切入
失败补偿机制
对于失败的Confirm和Cancel操作,会有补偿任务进行重试,具体实现类为RecoverScheduledJob,在这个类的init方法会启动quartz任务
public void init() {try {MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();//设置定时任务执行的对象和方法jobDetail.setTargetObject(transactionRecovery);jobDetail.setTargetMethod("startRecover");jobDetail.setName("transactionRecoveryJob");jobDetail.setConcurrent(false);jobDetail.afterPropertiesSet();CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();cronTrigger.setBeanName("transactionRecoveryCronTrigger");//设置cron表达式cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());cronTrigger.setJobDetail(jobDetail.getObject());cronTrigger.afterPropertiesSet();scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());scheduler.start();} catch (Exception e) {throw new SystemException(e);}
}
复制代码
在这个方法里会使用RecoverConfig的配置初始化定时任务,定时任务具体的执行逻辑使用MethodInvokingJobDetailFactoryBean的targetObject和targetMethod配置,对应为transactionRecovery的startRecover方法,我们来看下这个方法
public void startRecover() {//获取所有没被处理的transactionList<Transaction> transactions = loadErrorTransactions();//根据规则处理这些transactionrecoverErrorTransactions(transactions);}
复制代码
分别看下上述两个方法的逻辑
private List<Transaction> loadErrorTransactions() {long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();//获取在RecoverDuration间隔之前未完成的transactionreturn transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));}
复制代码
private void recoverErrorTransactions(List<Transaction> transactions) {for (Transaction transaction : transactions) {//重试次数超过上限的Transaction不再执行补偿if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));continue;}//如果是分支事务,并且超过最长超时时间忽略if (transaction.getTransactionType().equals(TransactionType.BRANCH)&& (transaction.getCreateTime().getTime() +transactionConfigurator.getRecoverConfig().getMaxRetryCount() *transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000> System.currentTimeMillis())) {continue;}try {transaction.addRetriedCount();//对超时的confiring操作重试if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {transaction.changeStatus(TransactionStatus.CONFIRMING);transactionConfigurator.getTransactionRepository().update(transaction);transaction.commit();transactionConfigurator.getTransactionRepository().delete(transaction);} else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)//对超时的Canceling操作重试,或者Root超时的trying进行cancel操作|| transaction.getTransactionType().equals(TransactionType.ROOT)) {transaction.changeStatus(TransactionStatus.CANCELLING);transactionConfigurator.getTransactionRepository().update(transaction);transaction.rollback();transactionConfigurator.getTransactionRepository().delete(transaction);}} catch (Throwable throwable) {if (throwable instanceof OptimisticLockException|| ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);} else {logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);}}}
}
复制代码
注意一点,trying阶段不会重试,失败未处理,会触发canceling操作
思考
分布式事务解决方案
下面列举一些分布式事务解决方案的特性
- 传统的二/三阶段提交 这种解决方案会占用数据库事务资源,在互联网公司很少使用
- 异步确保型事务 基于可靠消息的最终一致性,可以异步,但数据绝对不能丢,而且一定要记账成功 这个难道是依赖mq支持的事务特性?
- 最大努力通知型事务 按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对 目前项目比较常用的方式
- tcc 适用于实时性要求比较高,数据必须可靠的场景
我的理解
依照我目前的工作经验,现在公司对分布式事务的解决方案一般是上述的第三种方法,但是一些对实时性要求比较高,数据必须可靠的场景我们还是可以考虑使用tcc的,但是也没必要全盘tcc,可以和最大努力通知型事务一起使用
对于tcc还有一个疑问,在高并发情况下,在mq的模式下,由于是异步,能够保证消息最终被消费掉,并且消费速率稳定,而tcc这种模式,会不会导致接口资源不够用,接口资源都占用满,导致不断的try失败。
由此可见tcc的使用难度不止在业务使用方式上,对于一些极限的场景,需要有经验的人来分析tcc该使用在多大范围内。但是如果是并发量不大的项目,大家可以试着使用。
朋友们,使用或没使用过tcc的,请留下你的想法。
资源链接
tcc-transaction git地址
微服务架构的分布式事务解决方案
转载于:https://juejin.im/post/5ae16721518825673446bbd2
tcc-transation源码分析与思考相关推荐
- seata 如何开启tcc事物_分布式事务Seata-TCC源码分析
为了更好理解分布式事务,首先提出一个问题: 假设数据库中有两个表ta,tb,我们要分别更改ta表中的ra记录和tb表中的rb记录,但要求ra和rb记录都修改成功,才认为此次操作时成功,或者需要失败回滚 ...
- 【报错】flink源码分析: has no more allocated slots与思考
文章目录 一. 任务描述与一句话 1. 任务描述 2. 一句话 二. 日志分析 1. 申请一个task manager 2. 大概3分钟后运行这个tm时,报资源找不到 三. 源码分析与报错机制定位 1 ...
- 微服务发现与注册之Eureka源码分析
作者:陌北有棵树,Java人,架构师社区合伙人! [一]微服务之服务发现概述 关于微服务,近年来可谓是大火,业界也吹刮着一种实践微服务的风潮.本人有幸在去年参与到一个向微服务过渡的产品,再结合自己所学 ...
- 鸿蒙内核源码分析:调度机制篇
作者 | 深入研究鸿蒙,鸿蒙内核发烧友 出品 | CSDN(ID:CSDNnews) 头图 | CSDN 下载自东方 IC 阅读之前建议先读本系列其他文章,以便对本文任务调度机制的理解. 为什么要学这 ...
- 并发编程之——读锁源码分析(解释关于锁降级的争议)
1. 前言 在前面的文章 并发编程之--写锁源码分析中,我们分析了 1.8 JUC 中读写锁中的写锁的获取和释放过程,今天来分析一下读锁的获取和释放过程,读锁相比较写锁要稍微复杂一点,其中还有一点有争 ...
- Kubernetes StatefulSet源码分析
2019独角兽企业重金招聘Python工程师标准>>> Author: xidianwangtao@gmail.com,Based on Kubernetes 1.9 摘要:Kube ...
- [JUC-5]ConcurrentHashMap源码分析JDK8
在学习之前,最好先了解下如下知识: 1.ReentrantLock的实现和原理. 2.Synchronized的实现和原理. 3.硬件对并发支持的CAS操作及JVM中Unsafe对CAS的实现. 4. ...
- Snackbar源码分析
目录介绍 1.最简单创造方法 1.1 Snackbar作用 1.2 最简单的创建 1.3 Snackbar消失的几种方式 2.源码分析 2.1 Snackbar的make方法源码分析 2.2 对Sna ...
- Java 集合系列(四)—— ListIterator 源码分析
以脑图的形式来展示Java集合知识,让零碎知识点形成体系 Iterator 对比 Iterator(迭代器)是一种设计模式,是一个对象,用于遍历集合中的所有元素. Iterator 包含四个方 ...
最新文章
- 解决mongodb ISODate相差8小时问题
- 花最少的钱,训超6的机器人:谷歌大脑推出机器人强化学习平台,硬件代码全开源...
- Linux内核--网络栈实现分析(三)--驱动程序层+链路层(上)
- 洛谷4400 BlueMary的旅行(分层图+最大流)
- codeforces1437 E. Make It Increasing——最长上升子序列
- mockito接口没法赋值_Mockito:无法实例化@InjectMocks字段:类型是接口
- 在FAANG面试中破解堆算法
- IE8 select 动态下拉遇到的问题
- Spark实战电影点评系统(一)
- UDP方式的网络通信【示例】
- 手机麦克风结构原理图_麦克风阵列的基本原理、结构组成及声学效果简介
- Google搜索引擎的使用技巧
- 出生日期与年龄python_python根据出生日期返回年龄的方法
- mysql进阶(十九)SQL语句如何精准查找某一时间段的数据
- oracle导出导入同义词,使用datapump 导出导入同义词
- Mybatis源码研究序
- mysql事务排队情况_MySQL事务问题
- 港科资讯 | 香港科技大学与Microsoft香港签订AI商学院2.0合作备忘录
- 启幕 Next ’21 大会 - Google Cloud 与您畅谈技术,论道云端
- 淮北师范大学c语言试卷,2016年淮北师范大学物理与电子信息学院高级语言程序设计(加试)之C语言程序设计复试笔试最后押题五套卷...
热门文章
- CentOS 利用Yum安装mysql后无法启动(MySQL Daemon failed to start.)
- (Access denied for user 'root'@'localhost' (using password: NO))
- 数量queuepoj1149 PIGS
- 亚马逊RDS使用的第三方扩展有漏洞,可导致内部凭据遭泄露
- 英国国家网络安全中心:速修复严重的 MobileIron RCE 漏洞 (CVE-2020-15505)
- 从零开始--系统深入学习android(实践-让我们开始写代码-新手指南-3.Hello,本地化)...
- js文件、图片上传(原生方法和jquery的ajax两种都有)
- 安装labelImg
- 如何解决Greenplum中无法通过标准命令修复的元数据错误
- struts2中的session使用