一、RocketMQ 支持 3 种消息发送方式 :

1、同步消息(sync message )

producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结果 。

2、异步消息(async message)

producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。

3、单向消息(oneway message)

producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果 。

二、RocketMQ消息结构

RocketMQ的消息包括基础属性和扩展属性两部分:

1、基础属性

1)topic : 主题相当于消息的一级分类,具有相同topic的消息将发送至该topic下的消息队列中,比方说一个电商系统可以分为商品消息、订单消息、物流消息等,就可以在broker中创建商品主题、订单主题等,所有商品的消息发送至该主题下的消息队列中。

2)消息体:即消息的内容 ,可以的字符串、对象等类型(可系列化)。消息的最大长度 是4M。

3) 消息 Flag:消息的一个标记,RocketMQ不处理,留给业务系统使用。

2、扩展属性

1)tag :相当于消息的二级分类,用于消费消息时进行过滤,可为空 。

2)keys: Message 索引键,在运维中可以根据这些 key 快速检索到消息, 可为空 。 3)waitStoreMsgOK :消息发送时是否等消息存储完成后再返回 。

Message 的基础属性主要包括消息所属主题 topic , 消息 Flag(RocketMQ 不做处理)、 扩展属性、消息体 。

三、同步消息

1、创建test-rocketmq生产者工程

  1. 创建一个test-rocketmq的测试工程专门用于rocketmq的功能测试。

test-rocketmq父工程的pom.xml如下:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>mq</artifactId><groupId>com.pbteach</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion>​<artifactId>test-rocketmq</artifactId><packaging>pom</packaging><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies>​</project>

2)创建rocketmq-producer生产者工程

rocketmq-producer的pom.xml如下

 <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>test-rocketmq</artifactId><groupId>com.pbteach</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion>​<artifactId>rocketmq-producer</artifactId>​​</project>

3) 新建rocketmq-producer工程 的application.yml文件

 server:port: 8181 #服务端口servlet:context-path: /rocketmq-producer​spring:application:name: rocketmq-producer #指定服务名rocketmq:nameServer: 127.0.0.1:9876producer:group: demo-producer-group

4)新建启动类

 /*** @author 攀博课堂(www.pbteach.com)* @version 1.0**/
@SpringBootApplicationpublic class ProducerApplication {​public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}​}

2、发送同步消息

package com.pbteach.test.rocketmq.message;​import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;​/*** rocketmq发送消息类* @author 攀博课堂(www.pbteach.com)* @version 1.0**/@Componentpublic class ProducerSimple {​@Autowiredprivate RocketMQTemplate rocketMQTemplate;​/*** 发送同步消息* @param topic* @param msg*/public void sendSyncMsg(String topic, String msg){rocketMQTemplate.syncSend(topic,msg);}​​}


3、测试

1)在test下编写测试类,发送同步消息。

package com.pbteach.test.rocketmq.message;​import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;​/*** @author 攀博课堂(www.pbteach.com)* @version 1.0**/@RunWith(SpringRunner.class)@SpringBootTestpublic class ProducerSimpleTest {​@Autowiredprivate ProducerSimple producerSimple;​//测试发送同步消息@Testpublic void testSendSyncMsg(){this.producerSimple.sendSyncMsg("my-topic", "第一条同步消息");System.out.println("end...");}​}​

2)启动NameServer、Broker、管理端

3)执行testSendSyncMsg方法

4)观察控制台和管理端

控制台出现end… 表示消息发送成功。

进入管理端,查询消息。


4、创建消费者工程
1)创建消息消费者工程,pom.xml如下

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>test-rocketmq</artifactId><groupId>com.pbteach</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion>​<artifactId>rocketmq-consumer</artifactId>​​</project>

2)启动类

/*** @author 攀博课堂(www.pbteach.com)* @version 1.0**/@SpringBootApplicationpublic class ConsumerApplication {​public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}​}

3)配置文件application.yml

server:port: 8182 #服务端口servlet:context-path: /rocketmq-consumer​spring:application:name: rocketmq-consumer #指定服务名rocketmq:nameServer: 127.0.0.1:9876


4)编写消费消息监听类:

package com.pbteach.test.rocketmq.message;​import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;​/*** 消费消息监听类* @author 攀博课堂(www.pbteach.com)* @version 1.0**/@Component@RocketMQMessageListener(topic = "my-topic",consumerGroup = "demo-consumer-group")public class ConsumerSimple implements RocketMQListener<String> {​//接手到消息调用此方法@Overridepublic void onMessage(String s) {System.out.println(s);}}​

监听消息队列 需要指定:

topic:监听的主题

consumerGroup:消费组,相同消费组的消费者共同消费该主题的消息,它们组成一个集群。

5、测试

1、启动消费者工程

启动消费者工程,观察控制台输出“第一条同步消息”消息内容,这说明从消息队列已经读取到消息。

2、保证消费者工程已启动,再次发送消息,观察控制台是否输出“第一条同步消息”消息内容,输出则说明接收消息成功。

四、消息发送过程

通过测试对同步消息的发送和接收有一个粗略的认识,下边分析具体的消息发送过程,如下图:

消息发送流程如下:

1、Producer从NameServer中获取主题路由信息

Broker将自己的状态上报给NameServer,NameServer中存储了每个Broker及主题、消息队列的信息。

Producer根据 topic从NameServer查询所有消息队列,查询到的结果例如:

 [{"brokerName":"Broker-1","queueId":0},{"brokerName":"Broker-1","queueId":1},{"brokerName":"Broker-2","queueId":0},{"brokerName":"Broker-2","queueId":1}]

Producer按选择算法从以上队列中选择一个进行消息发送,如果发送消息失败则在下次选择的时候 会规避掉失败的broker。

2、构建消息,发送消息

发送消息前进行校验,比如消息的内容长度不能为0、消息最大长度、消息必要的属性是否具备等(topic、消息体,生产组等)。

如果该topic下还没有队列则自动创建,默认一个topic下自动创建4个写队列,4个读队列 。

为什么要多个队列 ?

1)高可用

当某个队列不可用时其它队列顶上。

2)提高并发

发送消息是选择队列进行发送,提高发送消息的并发能力。

消息消费时每个消费者可以监听多个队列,提高消费消息的并发能力。

生产组有什么用?

在事务消息中broker需要回查producer,同一个生产组的producer组成一个集群,提高并发能力。

3、监听队列,消费消息

一个消费组可以包括多个消费者,一个消费组可以订阅多个主题。

一个队列同时只允许一个消费者消费,一个消费者可以消费多个队列中的消息。

消费组有两种消费模式:

1)集群模式

一个消费组内的消费者组成一个集群,主题下的一条消息只能被一个消费者消费。

2)广播模式

主题下的一条消息能被消费组下的所有消费者消费。

消费者和broker之间通过推模式和拉模式接收消息,推模式即broker推送给消费者,拉模式是消费者主动从broker查询消息。

五、异步消息

producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。

在ProducerSimple中编写发送异步消息的方法

/*** 发送异步消息* @author 攀博课堂(www.pbteach.com)* @param topic* @param msg*/public void sendASyncMsg(String topic, String msg){rocketMQTemplate.asyncSend(topic,msg,new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {//成功回调System.out.println(sendResult.getSendStatus());}​@Overridepublic void onException(Throwable e) {//异常回调System.out.println(e.getMessage());}});}

测试:

 /*** 测试类* @author 攀博课堂(www.pbteach.com)* @version 1.0**/
@Testpublic void testSendASyncMsg() throws InterruptedException {this.producerSimple.sendASyncMsg("my-topic", "第一条异步步消息");System.out.println("end...");//异步消息,为跟踪回调线程这里加入延迟Thread.sleep(3000);}

六、单向消息

producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果 。

 /*** 发送单向消息* @author 攀博课堂(www.pbteach.com)* @param topic* @param msg*/public void sendOneWayMsg(String topic, String msg){this.rocketMQTemplate.sendOneWay(topic,msg);}

测试:

略。

七、延迟消息

1、延迟消息介绍

延迟消息也叫做定时消息,比如在电商项目的交易系统中,当用户下单之后超过一段时间之后仍然没有支付,此时就需要将该订单关l闭。要实现该功能的话,可以在用户创建订单时就发送一条包含订单内容的延迟消息,该消息在一段时间之后投递给消息消费者,当消息消费者接收到该消息后,判断该订单的支付状态,如果处于未支付状态,则将该订单关闭。

RocketMQ的延迟消息实现非常简单,只需要发送消息前设置延迟的时间,延迟时间存在十八个等级(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用setDelayTimeLevel()设置与时间相对应的延迟级别即可。

2、同步消息延迟

生产端:

 /*** 发送延迟消息* 消息内容为json格式* @author 攀博课堂(www.pbteach.com)*/public void sendMsgByJsonDelay(String topic, OrderExt orderExt) throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {//发送同步消息,消息内容将orderExt转为jsonMessage<OrderExt> message = MessageBuilder.withPayload(orderExt).build();//指定发送超时时间(毫秒)和延迟等级this.rocketMQTemplate.syncSend(topic,message,1000,3);​System.out.printf("send msg : %s",orderExt);}

消费端:

同自定义消息格式章节。

测试:

//测试发送同步消息@Testpublic void testSendMsgByJsonDelay() throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {OrderExt orderExt = new OrderExt();orderExt.setId(UUID.randomUUID().toString());orderExt.setCreateTime(new Date());orderExt.setMoney(168L);orderExt.setTitle("测试订单");this.producerSimple.sendMsgByJsonDelay("my-topic-obj",orderExt);System.out.println("end...");}

3、异步消息延迟

生产端:

 /*** 发送异步延迟消息* 消息内容为json格式* @author 攀博课堂(www.pbteach.com)*/public void sendAsyncMsgByJsonDelay(String topic, OrderExt orderExt) throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {//消息内容将orderExt转为jsonString json = this.rocketMQTemplate.getObjectMapper().writeValueAsString(orderExt);org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(topic,json.getBytes(Charset.forName("utf-8")));//设置延迟等级message.setDelayTimeLevel(3);//发送异步消息this.rocketMQTemplate.getProducer().send(message,new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println(sendResult);}​@Overridepublic void onException(Throwable throwable) {System.out.println(throwable.getMessage());}});​​System.out.printf("send msg : %s",orderExt);}

消费端:

同自定义消息格式章节。

测试

//测试发送异步消息@Testpublic void testSendAsyncMsgByJsonDelay() throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {OrderExt orderExt = new OrderExt();orderExt.setId(UUID.randomUUID().toString());orderExt.setCreateTime(new Date());orderExt.setMoney(168L);orderExt.setTitle("测试订单");this.producerSimple.sendAsyncMsgByJsonDelay("my-topic-obj",orderExt);System.out.println("end...");Thread.sleep(20000);}

八、消费重试

1、什么是消费重试
当消息发送到Broker成功,在被消费者消费时如果消费者没有正常消费,此时消息会重试消费。消费重试存在两种场景:

1)消息没有被消费者接收,比如消费者与broker存在网络异常。此种情况消息会一直被消费重试。

2)当消息已经被消费者成功接收,但是在进行消息处理时出现异常,消费端无法向Broker返回成功,这种情况下RocketMQ会不断重试。本小节重点讨论第二个场景。

针对第二种消费重试的场景,borker是怎么知道重试呢?

消费者在消费消息成功会向broker返回成功状态,否则会不断进行消费重试。

2、处理策略
当消息在消费时出现异常,此时消息被不断重试消费。RocketMQ会一直重试消费吗?

答案是不会!

消息会按照延迟消息的延迟时间等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)从第3级开始重试,每试一次如果还不成功则延迟等级加1。

比如:一条消息消费失败,等待10s(第3级)进行重试,如果还没有被成功消费则延迟等级加1,即按第4级别延迟等待,等30s继续进行重试,如此进行下去,直到重试16次。

当重试了16次还未被成功消费将会投递到死信队列,到达死信队列的消息将不再被消费。

实际生产中的处理策略是什么呢?

实际生产中不会让消息重试这么多次,通常在重试一定的次数后将消息写入数据库,由另外单独的程序或人工去处理。

项目使用的Spring整合RocketMQ的方式,消费者实现RocketMQListener的onMessage方法,在此方法中实现处理策略的示例代码如下:

 /*** 测试消费重试* @author 攀博课堂(www.pbteach.com)*/public class ConsumerSimple implements RocketMQListener<MessageExt> {​​@Overridepublic void onMessage(MessageExt messageExt) {//取出当前重试次数int reconsumeTimes = messageExt.getReconsumeTimes();//当大于一定的次数后将消息写入数据库,由单独的程序或人工去处理if(reconsumeTimes >=2){//将消息写入数据库,之后正常返回return ;}throw new RuntimeException(String.format("第%s次处理失败..",reconsumeTimes));}}

RocketMQ同步消息、异步消息、单向消息详解相关推荐

  1. 同步FIFO + 异步FIFO 【设计详解及代码分享】

    FIFO表示先入先出,是一种存储结构.可满足一下需求: 1.当输入数据速率和输出速率不匹配时.可作为临时存储单元. 2.用于不同时钟域之间的同步. 3.输入数据路径和输出数据路径之间的数据宽度不匹配时 ...

  2. 微信公众号图文消息添加word附件教程详解

    微信公众号图文消息添加word附件教程详解 我们都知道创建一个微信公众号,在公众号中发布一些文章是非常简单的,但公众号添加附件下载的功能却被限制,如今可以使用小程序"微附件"进行在 ...

  3. RocketMQ的拉(Pull)模式详解

    文章目录 一.RocketMQ的Pull模式 1.1 Pull模式的使用特点 1.2 Java中PullConsumer的几种实现 1.2.1 DefaultMQPullConsumer 1.2.1. ...

  4. WEB后台--邮件和短信业务实现(包括Java一键实现、封装和异步)以及原理详解

    本来就打算针对一些固定的特别点的业务(QQ与网易邮件.拦截设计.短信.定时器等等)来进行记录以及解析原理,这些会比较零散记录在JavaWeb的分类里面,感兴趣的童鞋可以去看下. 有人问为什么要邮件短信 ...

  5. 【队列源码研究】消息队列beanstalkd源码详解

    顺风车运营研发团队 李乐 1.消息队列简介 计算机软件发展的一个重要目标是降低软件耦合性: 网站架构中,系统解耦合的重要手段就是异步,业务之间的消息传递不是同步调用,而是将一个业务操作分为多个阶段,每 ...

  6. 消息队列RabbitMQ基础知识详解

    一: 什么是MQ? MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序或者模块对模块的通信方法.MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另 ...

  7. UCOSII中消息邮箱的使用方法详解

    UCOSII中到底如何使用邮箱来进行任务间通信? 前言:什么是行为同步,什么是资源同步? 行为同步: 资源同步: 1.在中断服务程序中(ISR)可不可以发送消息? 在ISR中,是可以发送消息的.类似的 ...

  8. 7-26 Windows消息队列 (25 分)(详解+思路+超时解决)

    一:题目 消息队列是Windows系统的基础.对于每个进程,系统维护一个消息队列.如果在进程中有特定事件发生,如点击鼠标.文字改变等,系统将把这个消息加到队列当中.同时,如果队列不是空的,这一进程循环 ...

  9. SpringCloud工作笔记059---Jquery消息提示插件toastr使用详解

    JAVA技术交流QQ群:170933152 toastr是一个基于jQuery简单.漂亮的消息提示插件,使用简单.方便,可以根据设置的超时时间自动消失. 1.使用很简单,首选引入toastr的js.c ...

  10. 异步FIFO的设计详解(格雷码计数+两级DFF同步)

    文章目录 一.异步FIFO介绍 1.1.空满判断 1.2.跨时钟域问题 1.3.格雷码转换 1.4.格雷码计数器 二.代码code 一.异步FIFO介绍   FIFO有同步和异步两种,同步即读写时钟相 ...

最新文章

  1. python 从url中提取域名和path
  2. 浅谈Java反射机制 之 获取类的字节码文件 Class.forName(全路径名) 、getClass()、class...
  3. 数据结构实验之二叉树三:统计叶子数
  4. python基础语法手册_说一说python中的几个基础语法
  5. Java黑皮书课后题第10章:*10.20(近似e)编程练习题5.26使用下面数列近似计算e(略),为了得到更好的精度,在计算中使用25位精度的BigDecimal
  6. 详解SpringMVC中Controller的方法中参数的工作原理[附带源码分析]
  7. 如何编写用户操作手册
  8. Dialog 基本使用
  9. MySQL事务及隔离级别详解
  10. 面向能源互联网的多端口DC/DC能源路由器控制策略研究
  11. Python:实现通过 isbn 搜索书籍算法(附完整源码)
  12. 短信平台通道搭建 wed网页版源码构架 路由通道多线搭建 后台管理系统的架构
  13. CentOS7.9安装Nextcloud+ocDownloader+aria2使用Nextcloud网盘做离线下载服务器
  14. java面向对象基础练习1(坐标点移动)
  15. C++基础---三目运算符
  16. 【MATLAB】根据已有数据绘制Bode图、时域曲线等(进阶版)
  17. 基于WASM的H265 Web播放器
  18. 孟岩BLOG理解矩阵一、二, 三
  19. Android10 修改音量级别和默认音量
  20. 电脑爱好者GHOSTWIN7纯净版V3.0

热门文章

  1. 我的三周年创作纪念日
  2. 前端学习第二弹:target属性
  3. 仿照利用android系统源码资源文件,修改SeekBar颜色 前景与背景
  4. 【视觉SLAM十四讲】李群与李代数
  5. [学习笔记]上传文件到EC2主机
  6. ROS之Hello word 程序
  7. 用百度接口验证是否上传了身份证图片信息[非姓名,身份证号匹配]
  8. 如何清理休眠文件(hiberfil.sys)
  9. 风尘中,忘了捡拾那日女子留下的胭脂
  10. 产品心理学(13-15)