RocketMQ 消息中间件

  • 集成 SpringBoot
    • 入门案例
    • 生产消息类型 - 同步、异步、一次性
    • 消费模式 - 集群、广播
    • 延时消息
    • 设置消息标签
    • 设置消息的 Key
    • 自定义属性设置
    • 消息过滤
    • 发送消息的方式

Java 从 0 到架构师目录:【Java从0到架构师】学习记录

集成 SpringBoot

入门案例

依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.0</version>
</dependency>

生产者

  • 配置文件:
rocketmq.name-server=192.168.52.128:9876
rocketmq.producer.group=my_group
server.port=9999
  • 实现代码:
@RestController
public class HelloController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@RequestMapping("01_hello")public String sendMsg(String message) throws Exception{SendResult sendResult = rocketMQTemplate.syncSend("01_boot_hello", message);return JSON.toJSONString(sendResult);}
}

消费者

  • 配置文件:
rocketmq.name-server=192.168.52.128:9876
server.port=7777
  • 实现代码:
@Component
@RocketMQMessageListener(topic = "01_boot_hello",consumerGroup = "maoge_consumer"
)
public class HelloConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {System.out.println("接收到的消息:" + msg);}
}

生产消息类型 - 同步、异步、一次性

同步消息syncSend

rocketMQTemplate.syncSend("01_boot_hello", message);

异步消息asyncSend

rocketMQTemplate.asyncSend("02_boot_async", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("消息发送成功:" + JSON.toJSONString(sendResult));}@Overridepublic void onException(Throwable throwable) {System.out.println("消息处理失败:" + throwable.getMessage());}
});

一次性消息sendOneWay

rocketMQTemplate.sendOneWay("03_boot_oneway",message);

消费模式 - 集群、广播

集群模式

@Component
@RocketMQMessageListener(topic = "02_boot_model",messageModel = MessageModel.CLUSTERING,consumerGroup = "02_boot_cluster"
)
public class ClusterConsumer01 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(JSON.toJSONString(message));}
}

广播模式

// 广播模式是实时消费消息的,在广播模式消费者启动之前的消息,无法接收
// 广播模式下发送失败的消息不会重试
@Component
@RocketMQMessageListener(topic = "02_boot_model",messageModel = MessageModel.BROADCASTING,consumerGroup = "02_boot_cluster"
)
public class ClusterConsumer02 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(JSON.toJSONString(message));}
}

延时消息

使用原生的 Produce 对象

DefaultMAProducer producer =  rocketMQTemplate.getProducer();
Message message = new Message(topic, "TagA", "9527", msg.getBytes());
message.setDelayTimeLevel(3);
// 在实际工作中,确保消息可靠性,捕获对应的异常
producer.send(message);

使用 Spring 接口

Message<String> msg = MessageBuilder.withPayload(message).build();
rocketMQTemplate.syncSend("04_boot_delay", msg, 3000, 3);

设置消息标签

// 在发送的消息 Topic:Tag 中间使用冒号隔开
rocketMQTemplate.convertAndSend("01-boot-hello:TagB", message, map);

设置消息的 Key

Message<?> msg = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_KEYS, "1100").build();
rocketMQTemplate.send("01-boot-hello", msg);

自定义属性设置

// 过滤设置: 需要开启 broker 的支持用户属性配置
// enablePropertyFilter=trueMap<String,Object> map=new HashMap<>();
//用户自定义属性
map.put("name", "hesj");
map.put("age", "18");
//也可以设置系统属性
map.put(MessageConst.PROPERTY_KEYS,age);
rocketMQTemplate.convertAndSend("01-boot-hello:TagB", message, map);

消息过滤

// 在RocketMQMessageListener添加注解
@RocketMQMessageListener(consumerGroup = "02_boot_cluster",topic = "02_boot_model",messageModel = MessageModel.BROADCASTING,//  消息过滤selectorType = SelectorType.TAG,selectorExpression = "age > 16"
)

发送消息的方式

  • 直接使用 rocketMQTemplate
  • 使用 DefaultMQProducer 对象
  • 使用 Spring 的 Message 接口

【Java从0到架构师】RocketMQ 使用 - 集成 SpringBoot相关推荐

  1. 【Java从0到架构师】RocketMQ 使用 - 发送消息、接收消息、特殊消息

    RocketMQ 消息中间件 入门案例 NameServer 地址 发送消息 同步发送消息 异步发送消息 一次性发送消息 生产者组.消息封装 接收消息 消费方式:推式消费.拉式消费 消息方式:集群模式 ...

  2. 【Java从0到架构师】RocketMQ 基础 - 应用、核心组件、安装

    RocketMQ 消息中间件 基础知识 消息中间件的应用 异步解耦 削峰填谷 消息分发 RocketMQ 核心组件 RocketMQ 安装 源码安装 修改配置参数 启动 管理控制台安装 测试项目 Ja ...

  3. 【Java从0到架构师(1),Java中高级面试题总结(全面)

    JSP 九大内置对象 MySQL 基础 + 多表查询 [Java从0到架构师]MySQL 基础 MySQL MySQL 的使用步骤 数据库的内部存储细节 GUI 工具 SQL 语句 DDL 语句 DD ...

  4. 【Java从0到架构师】SpringCloud - Sleuth、Zipkin、Config

    SpringCloud 链路追踪组件 Sleuth Zipkin 分布式配置中心 - Config Git + Config 分布式配置中心 Java 从 0 到架构师目录:[Java从0到架构师]学 ...

  5. 【Java从0到架构师】SpringCloud - Hystrix、Zuul

    SpringCloud 基本概念 熔断和降级 服务雪崩效应 服务熔断与降级 - Hystrix SpringBoot 集成 Hystrix 熔断降级服务异常报警通知 重点属性 - 熔断隔离策略.超时时 ...

  6. 【Java从0到架构师】SpringCloud - Eureka、Ribbon、Feign

    SpringCloud 分布式.微服务相关概念 微服务框架构选型 SpringCloud 概述 服务注册与发现 - Eureka 案例项目 Eureka 自我保护机制 微服务调用方式 - Ribbon ...

  7. 【Java从0到架构师】Dubbo 基础 - 设置启动时检查、直接提供者、线程模型、负载均衡、集群容错、服务降级

    Dubbo 分布式 RPC 分布式核心基础 分布式概述 RPC Dubbo Dubbo 入门程序 - XML.注解 部署管理控制台 Dubbo Admin 修改绑定的注册 IP 地址 设置启动时检查 ...

  8. 【Java从0到架构师】分布式框架通信核心基础 - 序列化(JDK、Protobuf)、远程过程调用 RMI

    分布式框架通信核心基础 序列化 JDK 的序列化 JDK 序列化的一些细节 Protobuf 序列化 Protobuf 环境搭建与操作 Protobuf 原理分析 实际数据传输 序列化技术选型 远程过 ...

  9. 【Java从0到架构师】Nginx 拓展 - HTTPS支持、缓存、Http请求防盗链、限流、高可用(Keepalived)

    Nginx HTTPS 支持 SSL 证书 重写 - rewrite Nginx 跨域解决方案 Nginx 开启缓存 Http 请求防盗链 Nginx 限流方案 常见的限流策略 - 计数器算法.漏桶算 ...

最新文章

  1. 安卓在线按钮设计网站
  2. dp进阶之FFT加速+数据结构优化+不等式优化
  3. vscode终端无法输入
  4. AFNetWorking 对汉字部分UTF-8编码
  5. Visual Studio Code——待补充
  6. Tomcat启动过程(一):从BootStrap到EndPonit
  7. Spring框架 @ResponseBody注解 编码问题: 论设置 Accept 的重要性
  8. 在Vmware中安装archlinux(2008.3core)的流程与心得
  9. 【RDMA】qp数量和RDMA性能(节选翻译)|连接数
  10. Antd如何用Menu组件渲染二级或三级目录
  11. 基础网络连接及拓扑图
  12. cisco交换机配置方法
  13. C#实现的基于SMTP协议的E-MAIL电子邮件发送客户端软件
  14. 摄像头和机械臂的手眼标定
  15. qt调用vc编写库文件的方法
  16. gazebo中视觉仿真怎么使用自定义贴图的问题
  17. 1000000转万 js_javascript将数字转换整数金额大写的方法
  18. eclipse中安装的lombok插件不起作用,报错:找不到getter
  19. Python数据分析与机器学习实战<三>Pandas
  20. 10个精致的企业网站设计模板欣赏及点评

热门文章

  1. 农村人深加工红薯,一招增值数倍,机器一条龙操作省人力
  2. 人有钱了就显得素质高,也显得懂事了
  3. Mybatis_day4_Mybatis的延迟加载
  4. linux shell spool,Linux/Unix shell 脚本中调用SQL,RMAN脚本
  5. power bi 创建空表_使用Power BI创建足球热图
  6. ssas报表项目数据集_处理SSAS多维OLAP多维数据集的有效方法
  7. 获取request的json数组对象
  8. Excel 2007 Open XML文件结构~~~1
  9. Django 视图层
  10. ruby 疑难点之—— attr_accessor attr_reader attr_writer