【Java从0到架构师】RocketMQ 使用 - 集成 SpringBoot
RocketMQ 消息中间件
- 集成 SpringBoot
- 入门案例
- 生产消息类型 - 同步、异步、一次性
- 消费模式 - 集群、广播
- 延时消息
- 设置消息标签
- 设置消息的 Key
- 自定义属性设置
- 消息过滤
- 发送消息的方式
Java 从 0 到架构师目录:【Java从0到架构师】学习记录
集成 SpringBoot
入门案例
依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.0</version>
</dependency>
生产者:
- 配置文件:
rocketmq.name-server=192.168.52.128:9876
rocketmq.producer.group=my_group
server.port=9999
- 实现代码:
@RestController
public class HelloController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@RequestMapping("01_hello")public String sendMsg(String message) throws Exception{SendResult sendResult = rocketMQTemplate.syncSend("01_boot_hello", message);return JSON.toJSONString(sendResult);}
}
消费者:
- 配置文件:
rocketmq.name-server=192.168.52.128:9876
server.port=7777
- 实现代码:
@Component
@RocketMQMessageListener(topic = "01_boot_hello",consumerGroup = "maoge_consumer"
)
public class HelloConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {System.out.println("接收到的消息:" + msg);}
}
生产消息类型 - 同步、异步、一次性
同步消息:syncSend
rocketMQTemplate.syncSend("01_boot_hello", message);
异步消息:asyncSend
rocketMQTemplate.asyncSend("02_boot_async", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("消息发送成功:" + JSON.toJSONString(sendResult));}@Overridepublic void onException(Throwable throwable) {System.out.println("消息处理失败:" + throwable.getMessage());}
});
一次性消息:sendOneWay
rocketMQTemplate.sendOneWay("03_boot_oneway",message);
消费模式 - 集群、广播
集群模式:
@Component
@RocketMQMessageListener(topic = "02_boot_model",messageModel = MessageModel.CLUSTERING,consumerGroup = "02_boot_cluster"
)
public class ClusterConsumer01 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(JSON.toJSONString(message));}
}
广播模式:
// 广播模式是实时消费消息的,在广播模式消费者启动之前的消息,无法接收
// 广播模式下发送失败的消息不会重试
@Component
@RocketMQMessageListener(topic = "02_boot_model",messageModel = MessageModel.BROADCASTING,consumerGroup = "02_boot_cluster"
)
public class ClusterConsumer02 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(JSON.toJSONString(message));}
}
延时消息
使用原生的 Produce 对象:
DefaultMAProducer producer = rocketMQTemplate.getProducer();
Message message = new Message(topic, "TagA", "9527", msg.getBytes());
message.setDelayTimeLevel(3);
// 在实际工作中,确保消息可靠性,捕获对应的异常
producer.send(message);
使用 Spring 接口:
Message<String> msg = MessageBuilder.withPayload(message).build();
rocketMQTemplate.syncSend("04_boot_delay", msg, 3000, 3);
设置消息标签
// 在发送的消息 Topic:Tag 中间使用冒号隔开
rocketMQTemplate.convertAndSend("01-boot-hello:TagB", message, map);
设置消息的 Key
Message<?> msg = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_KEYS, "1100").build();
rocketMQTemplate.send("01-boot-hello", msg);
自定义属性设置
// 过滤设置: 需要开启 broker 的支持用户属性配置
// enablePropertyFilter=trueMap<String,Object> map=new HashMap<>();
//用户自定义属性
map.put("name", "hesj");
map.put("age", "18");
//也可以设置系统属性
map.put(MessageConst.PROPERTY_KEYS,age);
rocketMQTemplate.convertAndSend("01-boot-hello:TagB", message, map);
消息过滤
// 在RocketMQMessageListener添加注解
@RocketMQMessageListener(consumerGroup = "02_boot_cluster",topic = "02_boot_model",messageModel = MessageModel.BROADCASTING,// 消息过滤selectorType = SelectorType.TAG,selectorExpression = "age > 16"
)
发送消息的方式
- 直接使用 rocketMQTemplate
- 使用 DefaultMQProducer 对象
- 使用 Spring 的 Message 接口
【Java从0到架构师】RocketMQ 使用 - 集成 SpringBoot相关推荐
- 【Java从0到架构师】RocketMQ 使用 - 发送消息、接收消息、特殊消息
RocketMQ 消息中间件 入门案例 NameServer 地址 发送消息 同步发送消息 异步发送消息 一次性发送消息 生产者组.消息封装 接收消息 消费方式:推式消费.拉式消费 消息方式:集群模式 ...
- 【Java从0到架构师】RocketMQ 基础 - 应用、核心组件、安装
RocketMQ 消息中间件 基础知识 消息中间件的应用 异步解耦 削峰填谷 消息分发 RocketMQ 核心组件 RocketMQ 安装 源码安装 修改配置参数 启动 管理控制台安装 测试项目 Ja ...
- 【Java从0到架构师(1),Java中高级面试题总结(全面)
JSP 九大内置对象 MySQL 基础 + 多表查询 [Java从0到架构师]MySQL 基础 MySQL MySQL 的使用步骤 数据库的内部存储细节 GUI 工具 SQL 语句 DDL 语句 DD ...
- 【Java从0到架构师】SpringCloud - Sleuth、Zipkin、Config
SpringCloud 链路追踪组件 Sleuth Zipkin 分布式配置中心 - Config Git + Config 分布式配置中心 Java 从 0 到架构师目录:[Java从0到架构师]学 ...
- 【Java从0到架构师】SpringCloud - Hystrix、Zuul
SpringCloud 基本概念 熔断和降级 服务雪崩效应 服务熔断与降级 - Hystrix SpringBoot 集成 Hystrix 熔断降级服务异常报警通知 重点属性 - 熔断隔离策略.超时时 ...
- 【Java从0到架构师】SpringCloud - Eureka、Ribbon、Feign
SpringCloud 分布式.微服务相关概念 微服务框架构选型 SpringCloud 概述 服务注册与发现 - Eureka 案例项目 Eureka 自我保护机制 微服务调用方式 - Ribbon ...
- 【Java从0到架构师】Dubbo 基础 - 设置启动时检查、直接提供者、线程模型、负载均衡、集群容错、服务降级
Dubbo 分布式 RPC 分布式核心基础 分布式概述 RPC Dubbo Dubbo 入门程序 - XML.注解 部署管理控制台 Dubbo Admin 修改绑定的注册 IP 地址 设置启动时检查 ...
- 【Java从0到架构师】分布式框架通信核心基础 - 序列化(JDK、Protobuf)、远程过程调用 RMI
分布式框架通信核心基础 序列化 JDK 的序列化 JDK 序列化的一些细节 Protobuf 序列化 Protobuf 环境搭建与操作 Protobuf 原理分析 实际数据传输 序列化技术选型 远程过 ...
- 【Java从0到架构师】Nginx 拓展 - HTTPS支持、缓存、Http请求防盗链、限流、高可用(Keepalived)
Nginx HTTPS 支持 SSL 证书 重写 - rewrite Nginx 跨域解决方案 Nginx 开启缓存 Http 请求防盗链 Nginx 限流方案 常见的限流策略 - 计数器算法.漏桶算 ...
最新文章
- 安卓在线按钮设计网站
- dp进阶之FFT加速+数据结构优化+不等式优化
- vscode终端无法输入
- AFNetWorking 对汉字部分UTF-8编码
- Visual Studio Code——待补充
- Tomcat启动过程(一):从BootStrap到EndPonit
- Spring框架 @ResponseBody注解 编码问题: 论设置 Accept 的重要性
- 在Vmware中安装archlinux(2008.3core)的流程与心得
- 【RDMA】qp数量和RDMA性能(节选翻译)|连接数
- Antd如何用Menu组件渲染二级或三级目录
- 基础网络连接及拓扑图
- cisco交换机配置方法
- C#实现的基于SMTP协议的E-MAIL电子邮件发送客户端软件
- 摄像头和机械臂的手眼标定
- qt调用vc编写库文件的方法
- gazebo中视觉仿真怎么使用自定义贴图的问题
- 1000000转万 js_javascript将数字转换整数金额大写的方法
- eclipse中安装的lombok插件不起作用,报错:找不到getter
- Python数据分析与机器学习实战<三>Pandas
- 10个精致的企业网站设计模板欣赏及点评
热门文章
- 农村人深加工红薯,一招增值数倍,机器一条龙操作省人力
- 人有钱了就显得素质高,也显得懂事了
- Mybatis_day4_Mybatis的延迟加载
- linux shell spool,Linux/Unix shell 脚本中调用SQL,RMAN脚本
- power bi 创建空表_使用Power BI创建足球热图
- ssas报表项目数据集_处理SSAS多维OLAP多维数据集的有效方法
- 获取request的json数组对象
- Excel 2007 Open XML文件结构~~~1
- Django 视图层
- ruby 疑难点之—— attr_accessor attr_reader attr_writer