tcc介绍

tcc是分布式事务的一种解决方案,即Try,Commit,Cancel

Try: 尝试执行业务

完成所有业务检查(一致性)预留必须业务资源(准隔离性)
复制代码

Confirm: 确认执行业务

真正执行业务不作任何业务检查只使用Try阶段预留的业务资源Confirm操作满足幂等性
复制代码

Cancel: 取消执行业务

释放Try阶段预留的业务资源Cancel操作满足幂等性
复制代码

本文我会讲解一个实现tcc思想的框架,tcc-transaction

在github还是比较火的,并且应该也有公司生产使用,熟悉它的源码,一方面可以提升自己眼界,扩宽自己编码能力,另一方面,以后需要使用它的时候,难免有坑需要修改,也能为开源贡献一份力量

tcc-transaction使用

在tcc-transaction的tcc-transaction-tutorial-sample模块,做相关配置运行即可

tcc-transaction原理

这边我们主要讲解tcc-transaction和dubbo的整合 tcc-transaction的主要原理是Aop,那么以后面试的时候,问用Aop做什么,就可以回答这个了,再把tcc讲一下,完美的连招 作为一个处理分布式事务的框架,先来讲下tcc-transaction的事务抽象

Transaction

tcc的事务,并不是数据库的事务,而是应用层的事务,所以tcc这三个阶段的操作,全部都需要我们手动实现 先看下Transaction对象的参数

    private TransactionXid xid;private TransactionStatus status;private TransactionType transactionType;private volatile int retriedCount = 0;private Date createTime = new Date();private Date lastUpdateTime = new Date();private long version = 1;private List<Participant> participants = new ArrayList<Participant>();private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
复制代码
参数 解释
xid 全局事务id,内容相当于uuid,用来保证事务唯一性
status 事务的状态,可以为TRYING,CONFIRMING,CANCELLING,分别对应tcc三个阶段
transactionType 事务类型,可以为ROOT,BRANCH,ROOT是主事务,BRANCH是分支事务,事务的发起方法对应主事务,其他被调用的dubbo接口在分支事务上
retriedCount 事务重试次数,当confirm或者cancel失败重试时增加
createTime 事务的创建时间
lastUpdateTime 事务最后一次更新时间
version 事务的版本号,乐观锁?
participants 事务的参与者
attachments 附加参数,暂时没发现被用到

再看下Transaction中两个比较重要的方法

public void commit() {for (Participant participant : participants) {participant.commit();}
}public void rollback() {for (Participant participant : participants) {participant.rollback();}
}
复制代码

执行Transaction的commit或rollback方法,会对应执行所有participant的commit或rollback方法

下面讲解Participant抽象

Participant

Participant是对tcc事务参与者的抽象

public class Participant implements Serializable {private static final long serialVersionUID = 4127729421281425247L;private TransactionXid xid;private InvocationContext confirmInvocationContext;private InvocationContext cancelInvocationContext;private Terminator terminator = new Terminator();Class<? extends TransactionContextEditor> transactionContextEditorClass;......
}
复制代码

在Participant中使用InvocationContext把事务参与者的confirm和cancel方法的元数据保存下来

public class InvocationContext implements Serializable {private static final long serialVersionUID = -7969140711432461165L;private Class targetClass;private String methodName;private Class[] parameterTypes;private Object[] args;
}
复制代码

元数据包括目标Class,目标方法名,目标参数列表,实际参数列表

Participant也保存了transactionContextEditorClass,用于提取事务上下文

接下来看Participant的commit和rollback方法

 public void rollback() {terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);}public void commit() {terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);}
复制代码

会通过terminator执行具体的操作,传入构造好的TransactionContext,confirmInvocationContext和transactionContextEditorClass

public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {try {//从spring容器中获取对应事务参与者实现Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();Method method = null;//获取对应方法反射对象method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());//设置上下文FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());//反射调用return method.invoke(target, invocationContext.getArgs());} catch (Exception e) {throw new SystemException(e);}}return null;}
复制代码

在terminator的调用过程中,我们需要理解的一点的是,Participant其实分两种,第一种是本地的Participant,直接反射调用即可,第二种是远程的Participant,也就是调用的是Dubbo接口,反射调用的同时会执行远程对等端的接口逻辑,所以这里需要用到transactionContextEditorClass来设置上下文信息,传递到远程dubbo接口中去

TransactionContext

TransactionContext的保存了全局事务id和事务状态,在调用事务参与者Participant的confirm或cancel方法时会传递过去

TransactionContextEditor

public interface TransactionContextEditor {public TransactionContext get(Object target, Method method, Object[] args);public void set(TransactionContext transactionContext, Object target, Method method, Object[] args);}
复制代码

TransactionContextEditor用于调用事务参与者方法时,设置和获取需要传递的TransactionContext,目前有2种实现,DefaultTransactionContextEditor和DubboTransactionContextEditor DefaultTransactionContextEditor从方法参数里面提取TransactionContext对象

@Overridepublic TransactionContext get(Object target, Method method, Object[] args) {//获取TransactionContext在args中的位置int position = getTransactionContextParamPosition(method.getParameterTypes());if (position >= 0) {return (TransactionContext) args[position];}return null;}
复制代码

DubboTransactionContextEditor从Dubbo Rpc上下文提取TransactionContext对象

@Overridepublic TransactionContext get(Object target, Method method, Object[] args) {//从Dubbo Rpc上下文获取String context = RpcContext.getContext().getAttachment(TransactionContextConstants.TRANSACTION_CONTEXT);if (StringUtils.isNotEmpty(context)) {return JSON.parseObject(context, TransactionContext.class);}return null;}
复制代码

TransactionManager

TransactionManager用来控制Transaction的生命周期,Transaction的改变使用TransactionRepository同步到第三方存储,一般使用mysql数据库存储 TransactionManager包含的方法以及属性如下

变量介绍

  1. transactionRepository
 private TransactionRepository transactionRepository;
复制代码

TransactionRepository用于对Transaction的持久化操作,如果是JDBC实现,其实就是对一张Transaction表的CRUD,这张表主要用于补偿任务 2. CURRENT

private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();
复制代码

这是一个双向队列,在这个类主要用作栈,用来处理事务的嵌套,因为是ThreadLocal,所以针对每个线程都是独立的 3. executorService

 private ExecutorService executorService;
复制代码

线程池,用于异步执行commit或者cancel

方法介绍

  1. registerTransaction
private void registerTransaction(Transaction transaction) {if (CURRENT.get() == null) {CURRENT.set(new LinkedList<Transaction>());}CURRENT.get().push(transaction);
}
复制代码

把transaction设置到ThreadLocal对象中去,push方法对应入栈 2. begin public Transaction begin() {

Transaction transaction = new Transaction(TransactionType.ROOT);
transactionRepository.create(transaction);
registerTransaction(transaction);
return transaction;
复制代码

} 开启事务,同步到repository,注册到ThreadLocal,这个方法与用于主事务的创建

  1. propagationNewBegin
public Transaction propagationNewBegin(TransactionContext transactionContext) {Transaction transaction = new Transaction(transactionContext);transactionRepository.create(transaction);registerTransaction(transaction);return transaction;
}
复制代码

这个方法用于从主事务的上下文创建分支事务,xid保持不变,事务类型变化

  1. propagationExistBegin
public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());if (transaction != null) {transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));registerTransaction(transaction);return transaction;} else {throw new NoExistedTransactionException();}
}
复制代码

这个方法用于从事务上下文同步事务状态到ThreadLocal

  1. commit
public void commit(boolean asyncCommit) {//从threadlocal得到当前事务final Transaction transaction = getCurrentTransaction();transaction.changeStatus(TransactionStatus.CONFIRMING);//数据库更新transactiontransactionRepository.update(transaction);if (asyncCommit) {try {Long statTime = System.currentTimeMillis();//通过线程池异步执行事务提交executorService.submit(new Runnable() {@Overridepublic void run() {commitTransaction(transaction);}});logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));} catch (Throwable commitException) {logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);throw new ConfirmingException(commitException);}} else {//同步执行事务提交commitTransaction(transaction);}
}
复制代码

这个方法执行事务的commit,实际事务提交在commitTransaction中执行,会执行每个事务参与者的commit方法

private void commitTransaction(Transaction transaction) {try {//调用事务参与者的提交方法transaction.commit();//事务结束,在数据库删除当前事务,如果commit异常,不会把数据库内事务记录删除,为了重试补偿transactionRepository.delete(transaction);} catch (Throwable commitException) {logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);throw new ConfirmingException(commitException);}
}
复制代码

看了这段逻辑后,我们可以发现,在commit失败的时候,并不会触发rollback,而是不删除数据库事务记录,之后会有定时任务进行扫描重试,后面会讲到这个定时任务

  1. rollback
public void rollback(boolean asyncRollback) {final Transaction transaction = getCurrentTransaction();transaction.changeStatus(TransactionStatus.CANCELLING);transactionRepository.update(transaction);if (asyncRollback) {try {executorService.submit(new Runnable() {@Overridepublic void run() {rollbackTransaction(transaction);}});} catch (Throwable rollbackException) {logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);throw new CancellingException(rollbackException);}} else {rollbackTransaction(transaction);}
}
复制代码

这个方法用于执行事务的回滚逻辑,和commit方法类似,在rollbackTransaction方法中,会执行每个事务参与者的rollback方法

private void rollbackTransaction(Transaction transaction) {try {//调用事务参与者的提交方法transaction.rollback();//事务结束,在数据库删除当前事务,如果rollback异常,不会把数据库内事务记录删除,为了重试补偿transactionRepository.delete(transaction);} catch (Throwable rollbackException) {logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);throw new CancellingException(rollbackException);}
}
复制代码
  1. cleanAfterCompletion
public void cleanAfterCompletion(Transaction transaction) {if (isTransactionActive() && transaction != null) {Transaction currentTransaction = getCurrentTransaction();if (currentTransaction == transaction) {//栈操作,后进先出CURRENT.get().pop();} else {throw new SystemException("Illegal transaction when clean after completion");}}
}
复制代码

事务结束,从栈中弹出结束的事务。

  1. enlistParticipant
public void enlistParticipant(Participant participant) {Transaction transaction = this.getCurrentTransaction();transaction.enlistParticipant(participant);transactionRepository.update(transaction);}
复制代码

给事务绑定事务参与者并同步到repository

接下来讲下核心的两个切面,这两个切面把上面的所有组件串联在一起使用

核心Aspect

在使用tcc-transaction的时候,我们需要对开启tcc事务的方法加上@Compensable注解,这个注解可以设置以下参数

参数 解释
propagation 事务传播性,包含REQUIRED(必须存在事务,不存在,创建),SUPPORTS(有事务的话在事务内运行),MANDATORY(必须存在事务),REQUIRES_NEW(不管是否存在,创建新的事务)
confirmMethod confirm阶段对应的方法
cancelMethod cancel阶段对应的方法
transactionContextEditor 设置对应transactionContextEditor
asyncConfirm 是否异步confirm
asyncCancel 是否异步cancel

@Compensable注解的参数会在下面两个切面中使用到

ConfigurableTransactionAspect

ConfigurableTransactionAspect主要用来控制Transaction的生命周期,内部通过CompensableTransactionInterceptor实现

@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableService() {}@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}
复制代码

直接看interceptCompensableMethod方法

public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {//解析@Compensable注解Method method = CompensableMethodUtils.getCompensableMethod(pjp);Compensable compensable = method.getAnnotation(Compensable.class);Propagation propagation = compensable.propagation();//获取上下文,如果是Root,不会存在上下文,Transaction都还没创建TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());boolean asyncConfirm = compensable.asyncConfirm();boolean asyncCancel = compensable.asyncCancel();boolean isTransactionActive = transactionManager.isTransactionActive();if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());}//计算方法类型,Root对应主事务入口方法,Provider对应远程提供者方的方法,Normal是主事务内消费者方的方法(是代理方法)MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);switch (methodType) {case ROOT://处理主事务切面return rootMethodProceed(pjp, asyncConfirm, asyncCancel);case PROVIDER://处理提供者事务切面return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);default://消费者事务直接执行,会对应执行远端提供者事务切面return pjp.proceed();}
}
复制代码

在tcc事务内被@Compensable注解的方法分三种

  1. Root方法,就是这次事务的入口方法
  2. Normal方法,在Root方法调用的dubbo接口方法
  3. Provider方法,对应dubbo接口方法的远程实现 被注解的方法都是try的逻辑,confirm和cancel逻辑配置在@Compensable注解参数中

对被@Compensable注解的方法执行切面逻辑的时候,会根据这三种方法类型做不同处理 对于Root方法,执行rootMethodProceed的逻辑

private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {Object returnValue = null;Transaction transaction = null;try {//创建事务transaction = transactionManager.begin();try {returnValue = pjp.proceed();} catch (Throwable tryingException) {if (isDelayCancelException(tryingException)) {transactionManager.syncTransaction();} else {logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);//回滚事务transactionManager.rollback(asyncCancel);}throw tryingException;}//提交事务transactionManager.commit(asyncConfirm);} finally {//清理操作transactionManager.cleanAfterCompletion(transaction);}return returnValue;
}
复制代码

会在被切方法(对应try逻辑)执行前,开启事务,try逻辑执行成功,通过transactionManager的commit方法执行每个事务参与者的commit逻辑,如果try失败,通过transactionManager执行每个参与者的rollback逻辑。

对于Provider方法

private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {Transaction transaction = null;try {switch (TransactionStatus.valueOf(transactionContext.getStatus())) {case TRYING://使用transactionContext创建分支事务transaction = transactionManager.propagationNewBegin(transactionContext);//执行被切方法逻辑return pjp.proceed();case CONFIRMING:try {//更新事务状态transaction = transactionManager.propagationExistBegin(transactionContext);//提交事务,不走切面的方法transactionManager.commit(asyncConfirm);} catch (NoExistedTransactionException excepton) {//the transaction has been commit,ignore it.}break;case CANCELLING:try {//更新事务状态transaction = transactionManager.propagationExistBegin(transactionContext);//回滚事务,不走切面的方法transactionManager.rollback(asyncCancel);} catch (NoExistedTransactionException exception) {//the transaction has been rollback,ignore it.}break;}} finally {//清理资源transactionManager.cleanAfterCompletion(transaction);}Method method = ((MethodSignature) (pjp.getSignature())).getMethod();//对于 confirm和 cancel 返回空值//主要针对原始类型做处理,因为不能为nullreturn ReflectionUtils.getNullValue(method.getReturnType());
}
复制代码

可以看到在provider类型方法的切面,也就是远程的Participant,如果transaction的status为trying,会通过transactionManager.propagationNewBegin创建分支事务并执行被切方法逻辑,如果是status为confirming或者canceling,会调用对应confirm或cancel配置的方法,跳过被切方法

对于normal类型 直接调用,normal类型的方法是封装了对远程dubbo接口方法调用逻辑的本地代理方法,所以直接执行即可

ConfigurableCoordinatorAspect

ConfigurableCoordinatorAspect主要是为了设置事务的参与者,在一个事务内,每个被@Compensable注解的方法都是事务参与者

@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void transactionContextCall() {}@Around("transactionContextCall()")
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
}
复制代码

相关逻辑封装在ResourceCoordinatorInterceptor的interceptTransactionContextMethod方法中

public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {//得到当前事务Transaction transaction = transactionManager.getCurrentTransaction();if (transaction != null) {switch (transaction.getStatus()) {case TRYING://只需要在trying的时候把参与者信息提取出来,设置到transaction中去enlistParticipant(pjp);break;case CONFIRMING:break;case CANCELLING:break;}}//执行目标方法return pjp.proceed(pjp.getArgs());
}
复制代码

在trying阶段会把所有参与者加入到事务中去,对于Root方法,创建主事务,加入的参与者会包括Root方法对应本地参与者以及Normal方法对应的远程参与者,对于Provider方法,通过主事务上下文创建分支事务,加入的参与者包括Provider方法对应的本地参与者以及它包含的Normal方法对应的远程参与者。这里的远程参与者又可以开启新的分支事务。层级多了,势必会产生性能问题。

接下来看enlistParticipant如何生成参与者对象

private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {//获取@Compensable信息Method method = CompensableMethodUtils.getCompensableMethod(pjp);if (method == null) {throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));}Compensable compensable = method.getAnnotation(Compensable.class);String confirmMethodName = compensable.confirmMethod();String cancelMethodName = compensable.cancelMethod();Transaction transaction = transactionManager.getCurrentTransaction();TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {//设置事务上下文到Editor,Editor用来统一提取上下文,这边对应设置dubbo的rpc上下文中去//这边的上下文设置后就会调用try逻辑FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());}Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());//目前的用法,其实只要保存调用参数就行,因为具体执行confirm和cancel都是根据transaction的status来判断的//confirm的调用上下文InvocationContext confirmInvocation = new InvocationContext(targetClass,confirmMethodName,method.getParameterTypes(), pjp.getArgs());//cancel的调用上下文InvocationContext cancelInvocation = new InvocationContext(targetClass,cancelMethodName,method.getParameterTypes(), pjp.getArgs());Participant participant =new Participant(xid,confirmInvocation,cancelInvocation,compensable.transactionContextEditor());//把participant设置到transaction,并且同步到持久化存储transactionManager.enlistParticipant(participant);}
复制代码

通过从@Compensable注解配置的信息以及当前Transaction来配置Participant。 在Participant设置到Transaction后,会执行pjp.proceed(pjp.getArgs()),也就执行了对应try逻辑的被切方法

ConfigurableCoordinatorAspect的逻辑会在ConfigurableTransactionAspect后执行,这和它们设置的order有关,小的order先执行,后切入

失败补偿机制

对于失败的Confirm和Cancel操作,会有补偿任务进行重试,具体实现类为RecoverScheduledJob,在这个类的init方法会启动quartz任务

public void init() {try {MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();//设置定时任务执行的对象和方法jobDetail.setTargetObject(transactionRecovery);jobDetail.setTargetMethod("startRecover");jobDetail.setName("transactionRecoveryJob");jobDetail.setConcurrent(false);jobDetail.afterPropertiesSet();CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();cronTrigger.setBeanName("transactionRecoveryCronTrigger");//设置cron表达式cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());cronTrigger.setJobDetail(jobDetail.getObject());cronTrigger.afterPropertiesSet();scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());scheduler.start();} catch (Exception e) {throw new SystemException(e);}
}
复制代码

在这个方法里会使用RecoverConfig的配置初始化定时任务,定时任务具体的执行逻辑使用MethodInvokingJobDetailFactoryBean的targetObject和targetMethod配置,对应为transactionRecovery的startRecover方法,我们来看下这个方法

public void startRecover() {//获取所有没被处理的transactionList<Transaction> transactions = loadErrorTransactions();//根据规则处理这些transactionrecoverErrorTransactions(transactions);}
复制代码

分别看下上述两个方法的逻辑

private List<Transaction> loadErrorTransactions() {long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();//获取在RecoverDuration间隔之前未完成的transactionreturn transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));}
复制代码
 private void recoverErrorTransactions(List<Transaction> transactions) {for (Transaction transaction : transactions) {//重试次数超过上限的Transaction不再执行补偿if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));continue;}//如果是分支事务,并且超过最长超时时间忽略if (transaction.getTransactionType().equals(TransactionType.BRANCH)&& (transaction.getCreateTime().getTime() +transactionConfigurator.getRecoverConfig().getMaxRetryCount() *transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000> System.currentTimeMillis())) {continue;}try {transaction.addRetriedCount();//对超时的confiring操作重试if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {transaction.changeStatus(TransactionStatus.CONFIRMING);transactionConfigurator.getTransactionRepository().update(transaction);transaction.commit();transactionConfigurator.getTransactionRepository().delete(transaction);} else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)//对超时的Canceling操作重试,或者Root超时的trying进行cancel操作|| transaction.getTransactionType().equals(TransactionType.ROOT)) {transaction.changeStatus(TransactionStatus.CANCELLING);transactionConfigurator.getTransactionRepository().update(transaction);transaction.rollback();transactionConfigurator.getTransactionRepository().delete(transaction);}} catch (Throwable throwable) {if (throwable instanceof OptimisticLockException|| ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);} else {logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);}}}
}
复制代码

注意一点,trying阶段不会重试,失败未处理,会触发canceling操作

思考

分布式事务解决方案

下面列举一些分布式事务解决方案的特性

  1. 传统的二/三阶段提交 这种解决方案会占用数据库事务资源,在互联网公司很少使用
  2. 异步确保型事务 基于可靠消息的最终一致性,可以异步,但数据绝对不能丢,而且一定要记账成功 这个难道是依赖mq支持的事务特性?
  3. 最大努力通知型事务 按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对 目前项目比较常用的方式
  4. tcc 适用于实时性要求比较高,数据必须可靠的场景

我的理解

依照我目前的工作经验,现在公司对分布式事务的解决方案一般是上述的第三种方法,但是一些对实时性要求比较高,数据必须可靠的场景我们还是可以考虑使用tcc的,但是也没必要全盘tcc,可以和最大努力通知型事务一起使用

对于tcc还有一个疑问,在高并发情况下,在mq的模式下,由于是异步,能够保证消息最终被消费掉,并且消费速率稳定,而tcc这种模式,会不会导致接口资源不够用,接口资源都占用满,导致不断的try失败。

由此可见tcc的使用难度不止在业务使用方式上,对于一些极限的场景,需要有经验的人来分析tcc该使用在多大范围内。但是如果是并发量不大的项目,大家可以试着使用。

朋友们,使用或没使用过tcc的,请留下你的想法。

资源链接

tcc-transaction git地址
微服务架构的分布式事务解决方案

转载于:https://juejin.im/post/5ae16721518825673446bbd2

tcc-transation源码分析与思考相关推荐

  1. seata 如何开启tcc事物_分布式事务Seata-TCC源码分析

    为了更好理解分布式事务,首先提出一个问题: 假设数据库中有两个表ta,tb,我们要分别更改ta表中的ra记录和tb表中的rb记录,但要求ra和rb记录都修改成功,才认为此次操作时成功,或者需要失败回滚 ...

  2. 【报错】flink源码分析: has no more allocated slots与思考

    文章目录 一. 任务描述与一句话 1. 任务描述 2. 一句话 二. 日志分析 1. 申请一个task manager 2. 大概3分钟后运行这个tm时,报资源找不到 三. 源码分析与报错机制定位 1 ...

  3. 微服务发现与注册之Eureka源码分析

    作者:陌北有棵树,Java人,架构师社区合伙人! [一]微服务之服务发现概述 关于微服务,近年来可谓是大火,业界也吹刮着一种实践微服务的风潮.本人有幸在去年参与到一个向微服务过渡的产品,再结合自己所学 ...

  4. 鸿蒙内核源码分析:调度机制篇

    作者 | 深入研究鸿蒙,鸿蒙内核发烧友 出品 | CSDN(ID:CSDNnews) 头图 | CSDN 下载自东方 IC 阅读之前建议先读本系列其他文章,以便对本文任务调度机制的理解. 为什么要学这 ...

  5. 并发编程之——读锁源码分析(解释关于锁降级的争议)

    1. 前言 在前面的文章 并发编程之--写锁源码分析中,我们分析了 1.8 JUC 中读写锁中的写锁的获取和释放过程,今天来分析一下读锁的获取和释放过程,读锁相比较写锁要稍微复杂一点,其中还有一点有争 ...

  6. Kubernetes StatefulSet源码分析

    2019独角兽企业重金招聘Python工程师标准>>> Author: xidianwangtao@gmail.com,Based on Kubernetes 1.9 摘要:Kube ...

  7. [JUC-5]ConcurrentHashMap源码分析JDK8

    在学习之前,最好先了解下如下知识: 1.ReentrantLock的实现和原理. 2.Synchronized的实现和原理. 3.硬件对并发支持的CAS操作及JVM中Unsafe对CAS的实现. 4. ...

  8. Snackbar源码分析

    目录介绍 1.最简单创造方法 1.1 Snackbar作用 1.2 最简单的创建 1.3 Snackbar消失的几种方式 2.源码分析 2.1 Snackbar的make方法源码分析 2.2 对Sna ...

  9. Java 集合系列(四)—— ListIterator 源码分析

    以脑图的形式来展示Java集合知识,让零碎知识点形成体系 Iterator 对比   Iterator(迭代器)是一种设计模式,是一个对象,用于遍历集合中的所有元素.   Iterator 包含四个方 ...

最新文章

  1. 解决mongodb ISODate相差8小时问题
  2. 花最少的钱,训超6的机器人:谷歌大脑推出机器人强化学习平台,硬件代码全开源...
  3. Linux内核--网络栈实现分析(三)--驱动程序层+链路层(上)
  4. 洛谷4400 BlueMary的旅行(分层图+最大流)
  5. codeforces1437 E. Make It Increasing——最长上升子序列
  6. mockito接口没法赋值_Mockito:无法实例化@InjectMocks字段:类型是接口
  7. 在FAANG面试中破解堆算法
  8. IE8 select 动态下拉遇到的问题
  9. Spark实战电影点评系统(一)
  10. UDP方式的网络通信【示例】
  11. 手机麦克风结构原理图_麦克风阵列的基本原理、结构组成及声学效果简介
  12. Google搜索引擎的使用技巧
  13. 出生日期与年龄python_python根据出生日期返回年龄的方法
  14. mysql进阶(十九)SQL语句如何精准查找某一时间段的数据
  15. oracle导出导入同义词,使用datapump 导出导入同义词
  16. Mybatis源码研究序
  17. mysql事务排队情况_MySQL事务问题
  18. 港科资讯 | 香港科技大学与Microsoft香港签订AI商学院2.0合作备忘录
  19. 启幕 Next ’21 大会 - Google Cloud 与您畅谈技术,论道云端
  20. 淮北师范大学c语言试卷,2016年淮北师范大学物理与电子信息学院高级语言程序设计(加试)之C语言程序设计复试笔试最后押题五套卷...

热门文章

  1. CentOS 利用Yum安装mysql后无法启动(MySQL Daemon failed to start.)
  2. (Access denied for user 'root'@'localhost' (using password: NO))
  3. 数量queuepoj1149 PIGS
  4. 亚马逊RDS使用的第三方扩展有漏洞,可导致内部凭据遭泄露
  5. 英国国家网络安全中心:速修复严重的 MobileIron RCE 漏洞 (CVE-2020-15505)
  6. 从零开始--系统深入学习android(实践-让我们开始写代码-新手指南-3.Hello,本地化)...
  7. js文件、图片上传(原生方法和jquery的ajax两种都有)
  8. 安装labelImg
  9. 如何解决Greenplum中无法通过标准命令修复的元数据错误
  10. struts2中的session使用