文章目录

  • 1:使用场景介绍
  • 2:方案原理介绍
  • 3:开发步骤-生产者
    • 1:pom
    • 2:application配置
    • 3:配置mq
    • 4:配置定时任务,完成支付状态发入mq
    • 5:开启启动类注解
  • 4:开发步骤-消费者
    • 1-2 基本配置一样
    • 3:配置mq,包括重试机制
    • 4:配置消费者消费

1:使用场景介绍

这种方案是基于分布式服务理论从业务层面做出的设计,【消息队列+定时任务+本地事件表】方案就是消息队列搭配上本地事件表实现的分布式事务,这种方案的特点就是通过服务和消息队列的整合,可以对服务实现解耦,缺点是异步执行
比较适用于哪些业务简单,数据量小,并且不要求同步执行的场景。说白了,如果项目选型时,大部分人不会分布式事务框架,可以用这样的业务设计来提到

2:方案原理介绍

还是按照在电商系统中,调用支付系统完成后,然后支付系统调用订单系统更改订单状态


注:

在2-1,-2,-3这样的场景中,一般是先调用本地db,然后再调用第三方。
其实其他业务也一样,比如我们假设一个场景,如果我们需要调用第三方接口,还要执行我们自己的sql逻辑,那么开发时的顺序肯定是先执行调用第三方的,然后再执行我们的。毕竟我们自己的db可以通过事务回滚,第三方已经调走了,收不回来了;

3:开发步骤-生产者

1:pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.2.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.mashibing.serviceorder</groupId><artifactId>service-order</artifactId><version>0.0.1-SNAPSHOT</version><name>service-order</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--activemq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><!--activemq pool--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><!-- mysql:MyBatis相关依赖 --><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.0.0</version></dependency><!-- mysql:mysql驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><!-- mysql:阿里巴巴数据库连接池 --><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.12</version></dependency><!-- json-lib --><dependency><groupId>net.sf.json-lib</groupId><artifactId>json-lib</artifactId><version>2.4</version><classifier>jdk15</classifier></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

2:application配置


server:port: 5001#应用名称及验证账号
spring:application:name: service-orderactivemq:broker-url: tcp://127.0.0.1:61616user: adminpassword: adminpool:enabled: truemax-connections: 100datasource:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/service-order?characterEncoding=UTF-8&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdbcp2:initial-size: 5min-idle: 5max-total: 5max-wait-millis: 200validation-query: SELECT 1test-while-idle: truetest-on-borrow: falsetest-on-return: falsemybatis:mapper-locations:- classpath:mapper/*.xml

3:配置mq

// 配置消息发送端
@Configuration
public class ActiveMQConfig {@Value("${mqurl}")private String brokerUrl;// 创建队列,用于使用JMS发送消息@Beanpublic Queue queue() {return new ActiveMQQueue("ActiveMQQueue");}// 作为消息的发起方,创建一个消息队列连接工厂,供消费者连接和监听@Beanpublic ActiveMQConnectionFactory connectionFactory(){return new ActiveMQConnectionFactory(brokerUrl);}
}

4:配置定时任务,完成支付状态发入mq

@Component
public class ProduceTask {@Autowiredprivate TblOrderEventDao tblOrderEventDao;@Autowiredprivate Queue queue;@AutowiredJmsMessagingTemplate jmsMessagingTemplate;// 创建一分钟一次的定时任务@Scheduled(cron="0 */1 * * * ?")@Transactional(rollbackFor = Exception.class)public void task(){// 取到数据库本地事务表状态为【新建】的数据List<TblOrderEvent> tblOrderEventList = tblOrderEventDao.selectByOrderType("0");for (int i = 0; i < tblOrderEventList.size(); i++) {TblOrderEvent event = tblOrderEventList.get(i);// 更改这条数据的状态为【已发送】tblOrderEventDao.updateEvent(event.getOrderType());System.out.println("修改数据库完成");// 使用jmsMessagingTemplate发送信息至消息队列jmsMessagingTemplate.convertAndSend(   queue,JSONObject.fromObject(event).toString());}}
}

5:开启启动类注解

@SpringBootApplication
@EnableJms // 支持消息传递的注解
@EnableScheduling   // 支持定时任务的注解
public class ServiceOrderApplication {public static void main(String[] args) {SpringApplication.run(ServiceOrderApplication.class, args);}
}

4:开发步骤-消费者

1-2 基本配置一样

3:配置mq,包括重试机制

// 配置消息消费端
@Configuration
public class ActiveMQConfig {@Value("${mqurl}")private String brokerUrl;@Value("${mquser}")private String user;@Value("${mqpassword}")private String password;/*** 作为消费方,根据activeMQ地址连接MQ工厂并设置重发机制* @param redeliveryPolicy* @return*/@Beanpublic ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy){ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(user,password,brokerUrl);activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);return activeMQConnectionFactory;}/*** 如果一个消息消费失败了,MQ工厂重发消息供消费者消费* @return*/@Beanpublic RedeliveryPolicy redeliveryPolicy(){RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();return redeliveryPolicy;}/*** 设置消息队列 确认机制* 设置手动确认,消费者确认之后才表明消费成功* @param activeMQConnectionFactory* @return*/@Beanpublic JmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory){DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();bean.setConnectionFactory(activeMQConnectionFactory);// 1: 自动确认,2: 客户端手动确认,3:自动批量确认,4 事务提交并确认。bean.setSessionAcknowledgeMode(2);return bean;}
}

4:配置消费者消费

@Component
public class ConsumerQueue {@Autowiredprivate TblPayEventDao tblPayEventDao;// destination属性值意为监听的目标,来源于消息发起者创建消息队列的nama属性值。// containerFactory属性值意为容器工厂,来源于消息消费者配置的监听消息的容器工厂。@JmsListener(destination = "ActiveMQQueue",containerFactory = "jmsListenerContainerFactory")public void receive(TextMessage textMessage, Session session) throws JMSException {try {System.out.println("收到的消息:"+textMessage.getText());String content = textMessage.getText();// 将传来信息转成对象并操作数据库TblPayEvent tblPayEvent = (TblPayEvent) JSONObject.toBean(JSONObject.fromObject(content), TblPayEvent.class);tblPayEventDao.insert(tblPayEvent);// 业务完成,确认消息 消费成功// 因为设置了手动确认,所以,需要调用消息的acknowledge方法textMessage.acknowledge();}catch (Exception e){// 回滚消息e.printStackTrace();System.out.println("异常了");// 消息消费失败,恢复回消息队列,等待重发再次消费。session.recover();}}/*** 监听死信队列,按需写方法内的业务逻辑* 补偿 处理(人工,脚本)。自己根据自己情况。* @param text*/@JmsListener(destination = "DLQ.ActiveMQQueue")public void receive2(String text){//发邮件,人工干预}
}

分布式事务讲解 -消息队列+定时任务+本地事件表相关推荐

  1. 我说分布式事务之消息最终一致性事务(二):RocketMQ的实现

    来源:https://0x9.me/A76YN 号外:最近整理了一下以前编写的一系列Spring Boot内容,整了个<Spring Boot基础教程>的PDF,关注我,回复:001,快来 ...

  2. 分布式事务讲解 - TX-LCN分布式事务框架(含LCN、TCC、TXC三种模式)

    分布式事务讲解 - TX-LCN分布式事务框架(含LCN.TCC.TXC三种模式) 分布式事务系列博客: TX-LCN框架原理 LCN 原理及主要特点 代码实现 实现场景 创建数据库及表(三个数据库, ...

  3. 分布式事务之消息补偿解决方案

    分布式事务之消息补偿解决方案 参考文章: (1)分布式事务之消息补偿解决方案 (2)https://www.cnblogs.com/lanxiaoke/p/8321657.html 备忘一下.

  4. python 消息队列 get是从队首还是队尾取东西_python分布式爬虫中消息队列知识点详解...

    当排队等待人数过多的时候,我们需要设置一个等待区防止秩序混乱,同时再有新来的想要排队也可以呆在这个地方.那么在python分布式爬虫中,消息队列就相当于这样的一个区域,爬虫要进入这个区域找寻自己想要的 ...

  5. python 消息队列 flask_python分布式爬虫中消息队列知识点详解

    当排队等待人数过多的时候,我们需要设置一个等待区防止秩序混乱,同时再有新来的想要排队也可以呆在这个地方.那么在python分布式爬虫中,消息队列就相当于这样的一个区域,爬虫要进入这个区域找寻自己想要的 ...

  6. 腾讯云分布式高可靠消息队列 CMQ 架构

    在分布式大行其道的今天,我们在系统内部.平台之间广泛运用消息中间件进行数据交换及解耦.CMQ 是腾讯云内部自研基于的高可靠.强一致.可扩展分 布式消息队列,在腾讯内部包括微信手机 QQ 业务红包.腾讯 ...

  7. 我说分布式事务之消息最终一致性事务(一):原理及实现

    来源:https://0x9.me/YgaUc 在之前的文章中,我们已经学习总结了分布式事务的两种解决方案. 我说分布式事务之TCC 我说分布式事务之最大努力通知型事务 本文我们将学习到另一种常见的柔 ...

  8. 实力分享,聚焦分布式高可用消息队列

    消息队列(Message Queue),是分布式系统中非常重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息 ...

  9. 腾讯云分布式高可靠消息队列CMQ架构

    针对金融.交易.订单等对可靠性.可用性有较高要求的业务场景,本文分享如何通过CMQ消息队列实现高可用架构 作者:张浩         出处:腾云阁文章 ---------------------- 在 ...

最新文章

  1. 优化方法的基本认识 overview
  2. Java使用预定格式获取时间字符串
  3. STL:使用string、vector、complex和limits
  4. SpringCloud学习成长之 十一 Docker部署cloud项目
  5. 光纤收发器的7大挑选技巧
  6. IDEA配置TeaVM插件
  7. Linux系统移植概述
  8. Mac和PC在工作中管理的对比(1)
  9. python函数拟合
  10. xxl-job源码解读:调度器schedule
  11. MPAndroidChart使用(BarChart为例)
  12. 程序员代码下的许豪杰
  13. python获取文件夹名称、文件名、去除后缀的文件名、文件改名等
  14. 360wifi在linux系统如何使用,在树莓派上使用360WIFI(也适用于小米、百度、腾讯WIFI)...
  15. ABC3D创客项目:小风扇
  16. 解决Pixel手机时间不能自动同步
  17. Android开发(一):Android Studio及SDK下载安装教程2020
  18. (附源码)python+mysql+基于springboot小型车队管理系统 毕业设计061709
  19. 软件测试的培训机构靠谱吗
  20. 【猪脸识别哪家强?】智能养猪成千万级别饲养规模杀手锏

热门文章

  1. android --------- Android10系统上访问本地相机下的视频文件报错 /storage/emulated/0/DCIM/Camera/ open failed: EACCES
  2. 透过细节看日本(转)
  3. nacos服务注册不上
  4. mysql注入实验报告_网络安全实验报告 第二章
  5. EAX、EBX、ECX、EDX、ESI、EDI、ESP、EBP 寄存器详解
  6. zoom如何使用网页版登录
  7. Unity_粒子特效
  8. 基于博客系统的访客日志记录----代码合集
  9. 前端开发常用PhotoShop快捷键整理(更新中)
  10. 微信授权之登录、注册、关联微信号全流程图