需求背景:
现在有内容中心(content-center)和 用户中心(user-center)2个微服务,请求内容中心,发送消息给用户中心,完成为指定用户添加积分操作。

文章目录

  • 一、准备工作
    • 1. 版本对照
    • 2. 下载启动RocketMQ
    • 3. 引入maven依赖
  • 二、内容中心(服务端)
    • 2.1. 表结构设计
    • 2.2. 配置MQ信息
    • 2.3. 控制层
    • 2.4. service层
    • 2.5. RocketMQ 事务消息监听
  • 三、用户中心(客户端)
    • 3.1. 依赖
    • 3.2.配置
    • 3.3. 消息监听
    • 开源项目:
一、准备工作
1. 版本对照
RocketMQ 版本 RocketMQ控制台版本 RocketMQ starter版本
RocketMQ 4.8.0 支持RocketMQ 4.8.0 2.2.0
2. 下载启动RocketMQ

linux 环境 RocketMQ 4.8.0 安装、部署控制台
https://blog.csdn.net/weixin_40816738/article/details/116269833

windows下RocketMQ下载、安装、部署、控制台
https://blog.csdn.net/weixin_40816738/article/details/115734482

3. 引入maven依赖
 <!--集成rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
二、内容中心(服务端)

消息发送端代码编写

2.1. 表结构设计

share分享表和rocketmq_transaction_logRocketMQ事务日志表2张表,
share

CREATE TABLE IF NOT EXISTS `share` (`id` INT NOT NULL AUTO_INCREMENT COMMENT 'id',`user_id` INT NOT NULL DEFAULT 0 COMMENT '发布人id',`title` VARCHAR(80) NOT NULL DEFAULT '' COMMENT '标题',`create_time` DATETIME NOT NULL COMMENT '创建时间',`update_time` DATETIME NOT NULL COMMENT '修改时间',`is_original` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否原创 0:否 1:是',`author` VARCHAR(45) NOT NULL DEFAULT '' COMMENT '作者',`cover` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '封面',`summary` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '概要信息',`price` INT NOT NULL DEFAULT 0 COMMENT '价格(需要的积分)',`download_url` VARCHAR(256) NOT NULL DEFAULT '' COMMENT '下载地址',`buy_count` INT NOT NULL DEFAULT 0 COMMENT '下载数 ',`show_flag` TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否显示 0:否 1:是',`audit_status` VARCHAR(10) NOT NULL DEFAULT 0 COMMENT '审核状态 NOT_YET: 待审核 PASSED:审核通过 REJECTED:审核不通过',`reason` VARCHAR(200) NOT NULL DEFAULT '' COMMENT '审核不通过原因',PRIMARY KEY (`id`))
ENGINE = InnoDB
COMMENT = '分享表';

rocketmq_transaction_logRocketMQ

-- -----------------------------------------------------
-- Table `rocketmq_transaction_log`
-- -----------------------------------------------------
create table rocketmq_transaction_log
(id             int auto_increment comment 'id'primary key,transaction_Id varchar(45) not null comment '事务id',log            varchar(45) not null comment '日志'
)comment 'RocketMQ事务日志表';

具体详情:见项目源码

2.2. 配置MQ信息
  • 项目内部yml配置
server:port: 8003
spring:application:# 应用名称name: ly-rockketmqprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
  • nacos服务端配置
# MQ name-server地址
rocketmq:name-server: 127.0.0.1:9876producer:#必须指定groupgroup: test-group
2.3. 控制层
package com.gblfy.lyrocketmq.controller;import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.lyrocketmq.entity.Share;
import com.gblfy.lyrocketmq.service.ShareService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/admin/shares")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController {private final ShareService shareService;@PutMapping("/audit/{id}")public Share auditById(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {//TODO 认证授权return this.shareService.auditById(id, auditDTO);}
}
2.4. service层
package com.gblfy.lyrocketmq.service;import com.gblfy.api.RemoteProductService;
import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.common.dto.ShareDTO;
import com.gblfy.common.dto.UserAddBonusMsgDTO;
import com.gblfy.common.dto.UserDTO;
import com.gblfy.common.enums.AuditStatusEnum;
import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog;
import com.gblfy.lyrocketmq.entity.Share;
import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper;
import com.gblfy.lyrocketmq.mapper.ShareMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.util.Objects;
import java.util.UUID;@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareService {private final ShareMapper shareMapper;private final RemoteProductService userCenterFeignClient;private final RocketMQTemplate rocketMQTemplate;private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;public ShareDTO findById(Integer id) {Share share = this.shareMapper.selectByPrimaryKey(id);Integer userId = share.getUserId();UserDTO userDTO = this.userCenterFeignClient.findById(userId);ShareDTO shareDTO = new ShareDTO();BeanUtils.copyProperties(share, shareDTO);//设置发布人shareDTO.setWxNickname(userDTO.getWxNickname());return shareDTO;}public Share auditById(Integer id, ShareAuditDTO auditDTO) {// 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常Share share = this.shareMapper.selectByPrimaryKey(id);if (share == null) {throw new IllegalArgumentException("参数非法!该分享不存在!");}if (!Objects.equals("NOT_YET", share.getAuditStatus())) {throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");}//----------------------------------------发送半消息----------------------------------------// 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {//消息idString transactionId = UUID.randomUUID().toString();this.rocketMQTemplate.sendMessageInTransaction("tx-add-bonus-group",MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()// Header有妙用).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader("share_id", id).build(),//arg有大用处auditDTO);} else {this.auditByIdInDB(id, auditDTO);}return share;}/*** 审批** @param id* @param auditDTO*/public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason()).build();this.shareMapper.updateByPrimaryKeySelective(share);}@Transactional(rollbackFor = Exception.class)public void auditByIdWithRoketMqlog(Integer id, ShareAuditDTO auditDTO, String transactionId) {this.auditByIdInDB(id, auditDTO);this.rocketmqTransactionLogMapper.insertSelective(RocketmqTransactionLog.builder().transactionId(transactionId).log("审核分享..").build());}
}
2.5. RocketMQ 事务消息监听
package com.gblfy.lyrocketmq.listener;import com.gblfy.common.dto.ShareAuditDTO;
import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog;
import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper;
import com.gblfy.lyrocketmq.service.ShareService;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {private final ShareService shareService;private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;/*** 执行本地事务** @param msg 消息header信息* @param arg 消息体* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);Integer share_id = Integer.valueOf((String) headers.get("share_id"));try {this.shareService.auditByIdWithRoketMqlog(share_id, (ShareAuditDTO) arg, transactionId);return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}/*** 本地事务的检查,检查本地事务是否成功** @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);RocketmqTransactionLog rocketmqTransactionLog = this.rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder().transactionId(transactionId).build());if (rocketmqTransactionLog != null) {return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.ROLLBACK;}
}

详细见源码:本文底部

三、用户中心(客户端)

消息消费端代码编写

3.1. 依赖
 <!--集成rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
3.2.配置
  • 项目内部yml配置
server:port: 9000
spring:application:# 应用名称name: ly-productprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
  • nacos服务端配置
rocketmq:name-server: 127.0.0.1:9876
3.3. 消息监听
package com.gblfy.product.listenner;import com.gblfy.common.dto.UserAddBonusMsgDTO;
import com.gblfy.product.entity.BonusEventLog;
import com.gblfy.product.entity.User;
import com.gblfy.product.mapper.BonusEventLogMapper;
import com.gblfy.product.mapper.UserMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@RocketMQMessageListener(topic = "tx-add-bonus-group", consumerGroup = "consumer-group")
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {private final UserMapper userMapper;private final BonusEventLogMapper bonusEventLogMapper;@Overridepublic void onMessage(UserAddBonusMsgDTO message) {// 1. 为用户添加积分Integer userId = message.getUserId();Integer bonus = message.getBonus();User user = this.userMapper.selectByPrimaryKey(userId);user.setBonus(user.getBonus() + bonus);this.userMapper.updateByPrimaryKeySelective(user);// 2.记录日志到bonus_event_log表中this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加积分...").build());log.info("积分添加完毕...");}
}
开源项目:

https://gitee.com/gb_90/micro-service-parent

SpringBoot2.x Nacos RocketMQ 事务消息相关推荐

  1. 实战分析 RocketMQ事务消息

    众所周知,在分布式领域有两大经典理论:CAP 和 BASE.一般情况下,我们将CAP中的数据一致性称为"强一致性",将BASE中的数据一致性称为"最终一致性". ...

  2. 搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

    搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocke ...

  3. 通过源码告诉你,阿里的RocketMQ事务消息到底牛逼在哪?

    文章转载自公众号  心源意码 , 作者 寻筝 "得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席." 由阿里自研的Roc ...

  4. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

  5. RocketMQ事务消息实现分析

    这周RocketMQ发布了4.3.0版本,New Feature中最受关注的一点就是支持了事务消息: 今天花了点时间看了下具体的实现内容,下面是简单的总结. RocketMQ事务消息概要 通过冯嘉发布 ...

  6. RocketMQ 事务消息

    RocketMQ 事务消息在实现上充分利用了 RocketMQ 本身机制,在实现零依赖的基础上,同样实现了高性能.可扩展.全异步等一系列特性. 在具体实现上,RocketMQ 通过使用 Half To ...

  7. 一文详解,RocketMQ事务消息

    在RocketMQ中有一个非常有用的功能,就是事务消息功能,事务消息机制,可以让我们确保发送的消息一定能写进MQ里,绝不会丢失掉. MQ事务消息机制还是挺有用的,在业内还是比较常见的,所以今天我们就来 ...

  8. RocketMQ事务消息及消息索引设计原理

    RocketMQ事务消息 正常事务消息的发送及提交 事务消息的补偿流程 一阶段的half消息如何做到对用户不可见? 回滚之后 pending状态的消息如何变成最终状态 通过Op消息来确定提交或回滚事务 ...

  9. RocketMQ事务消息从生产到消费原理详解(包括回查过程)

    名词解释 half消息(生产者发送的Prepare消息):发送到MQ Server但无法被consumer消费的消息,暂时存在MQ Server,需要收到生产者二次确认后才能被消费 消息回查:一些意外 ...

最新文章

  1. 比特币(包括BTC和BCH)的零确认到底安不安全?
  2. 关于数据存储的那些事1
  3. SIFT特征及特征匹配:SIFT and feature matching
  4. ThreadLocal是什么?
  5. LSTM神经网络 和 GRU神经网络
  6. 大学计算机需要论文吗,大一新生刚开学,是否有必要带电脑?听听辅导员的建议,非常中肯...
  7. 单片机shell命令_MCU调试大法:使用串口实现简单shell功能
  8. 明星开餐厅十店九亏?明星靠“卖面子”能撑多久?
  9. html页面锁屏,JavaScript网页锁屏效果源码实例
  10. 基于Java的在线购书系统
  11. 【交通标志识别】基于matlab GUI BP神经网络交通标志识别(带面板)【含Matlab源码 1647期】
  12. 13种常见软件体系结构风格定义分析、结构图、优缺点
  13. 航空机票预定系统软件结构图
  14. 设计任务调度依赖配置表
  15. Simulink代码生成:Simulink Function子系统及其代码
  16. 关于以太网卡的组成部分:MAC/PHY/变压器
  17. java 建立临时文件夹
  18. Zeppelin集成Spark3
  19. 定点数的运算 —— 逻辑移位、算术移位、循环移位
  20. 《Java8实战》第9章 重构、测试和调试

热门文章

  1. 定了!这个专业研究生扩招,博士生待遇要提高!已有多所高校新增…
  2. 美科技股崩了,Facebook、亚马逊、谷歌一夜蒸发万亿元
  3. (pytorch-深度学习)门控循环单元(GRU)
  4. 语言把数据写入csv文件_把JSON/CSV文件打造成MySQL数据库
  5. 课堂笔记——Data Mining(1)
  6. 当爬虫遇到需要动态ip才能获取资源的时候如何解决?
  7. TCP解决connect函数的超时问题
  8. 开放搜索助力提升趣店商城20%转化率
  9. 2019阿里巴巴技术面试题集锦(含答案)
  10. “大团队”和“敏捷开发”,谁说不可兼得?