注:基础请参考上一篇文章RocketMQ简介与安装及入门

1. 重试策略

在消息的发送和消费过程中,都有可能出现错误,如网络异常等,出现了错误就需要进行错误重试,这种消息的重试需要分2种,分别是producer端重试和consumer端重试。

1.1 producer端重试

生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("Hello");producer.setNamesrvAddr("192.16.55.185:9876");//消息发送失败时,重试3次producer.setRetryTimesWhenSendFailed(3);producer.start();String msgStr = "用户A发送消息给用户B";Message msg = new Message("hello_topic","SEND_MSG",msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息,并且指定超时时间SendResult sendResult = producer.send(msg, 1000);System.out.println("消息状态:" + sendResult.getSendStatus());System.out.println("消息id:" + sendResult.getMsgId());System.out.println("消息queue:" + sendResult.getMessageQueue());System.out.println("消息offset:" + sendResult.getQueueOffset());System.out.println(sendResult);producer.shutdown();}
}

1.2 consumer端重试

消费者端的失败,分为2种情况,一个是exception,一个是timeout。

1.2.1 exception

消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。
消息的状态:

package org.apache.rocketmq.client.consumer.listener;
public enum ConsumeConcurrentlyStatus {/*** Success consumption*/CONSUME_SUCCESS,/*** Failure consumption,later try to consume*/RECONSUME_LATER;
}

可以看到,消息的状态分为成功或者失败。如果返回的状态为失败会怎么样呢?
在启动broker的日志中可以看到这样的信息:

INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h
2h

这个表示了,如果消息消费失败,那么消息将会在1s、5s、10s后重试,一直到2h后不再重试。其实,有些时候并不需要重试这么多次,一般重试3~5次即可。这个时候就可以通过msg.getReconsumeTimes()获取重试次数进行控制。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class ConsumerDemo {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Hello");consumer.setNamesrvAddr("192.16.55.185:9876");// 订阅topic,接收此Topic下的所有消息consumer.subscribe("my-topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println("收到消息->" + msgs);if(msgs.get(0).getReconsumeTimes() >= 3){// 重试3次后,不再进行重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();}
}
1.2.2 timeout

比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败,这个时候定义为超时。

2 RocketMQ的集群

2.1 集群模式

在RocketMQ中,集群的部署模式是比较多的,有以下几种:

  • 单个Master
    这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。
  • Master模式
    一个集群无Slave,全是Master,例如2个Master或者3个Master。单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
  • 多Master多Slave模式,异步复制。每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级。
    优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master宕机后,消费者仍然可以从Slave消费,此过程对应用透明,不需要人工干预。性能同多Master模式几乎一样。
    缺点:Master宕机,磁盘损坏情况,会丢失少量消息。
  • 多Master多Slave模式,同步双写。每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功。
    优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。
    缺点:性能比异步复制模式略低,大约低10%左右。

2.2 搭建2m2s集群

下面通过docker搭建2master+2slave的集群。

#创建2个master
#nameserver1
docker create -p 9876:9876 --name rmqserver01 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /haoke/rmq/rmqserver01/logs:/opt/logs \
-v /haoke/rmq/rmqserver01/store:/opt/store \
foxiswho/rocketmq:server-4.5.2
#nameserver2
docker create -p 9877:9876 --name rmqserver02 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /haoke/rmq/rmqserver02/logs:/opt/logs \
-v /haoke/rmq/rmqserver02/store:/opt/store \
foxiswho/rocketmq:server-4.5.2#创建第1个master broker
#master broker01
docker create --net host --name rmqbroker01 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /haoke/rmq/rmqbroker01/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /haoke/rmq/rmqbroker01/logs:/opt/logs \
-v /haoke/rmq/rmqbroker01/store:/opt/store \
foxiswho/rocketmq:broker-4.5.2#配置
namesrvAddr=192.16.55.185:9876;192.16.55.185:9877
brokerClusterName=ItcastCluster
brokerName=broker01
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=192.16.55.185
brokerIp2=192.16.55.185
listenPort=10911#创建第2个master broker
#master broker02
docker create --net host --name rmqbroker02 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /haoke/rmq/rmqbroker02/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /haoke/rmq/rmqbroker02/logs:/opt/logs \
-v /haoke/rmq/rmqbroker02/store:/opt/store \
foxiswho/rocketmq:broker-4.5.2
#master broker02
namesrvAddr=192.16.55.185:9876;192.16.55.185:9877
brokerClusterName=ItcastCluster
brokerName=broker02
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=192.16.55.185
brokerIp2=192.16.55.185
listenPort=10811#创建第1个slave broker
#slave broker01
docker create --net host --name rmqbroker03 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /haoke/rmq/rmqbroker03/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /haoke/rmq/rmqbroker03/logs:/opt/logs \
-v /haoke/rmq/rmqbroker03/store:/opt/store \
foxiswho/rocketmq:broker-4.5.2
#slave broker01
namesrvAddr=192.16.55.185:9876;192.16.55.185:9877
brokerClusterName=ItcastCluster
brokerName=broker01
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=192.16.55.185
brokerIp2=192.16.55.185
listenPort=10711#创建第2个slave broker
#slave broker01
docker create --net host --name rmqbroker04 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /haoke/rmq/rmqbroker04/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /haoke/rmq/rmqbroker04/logs:/opt/logs \
-v /haoke/rmq/rmqbroker04/store:/opt/store \
foxiswho/rocketmq:broker-4.5.2
#slave broker02
namesrvAddr=192.16.55.185:9876;192.16.55.185:9877
brokerClusterName=ItcastCluster
brokerName=broker02
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=192.16.55.185
brokerIp2=192.16.55.185
listenPort=10611#启动容器
docker start rmqserver01 rmqserver02
docker start rmqbroker01 rmqbroker02 rmqbroker03 rmqbroker04

3. SprinBoot整合RocketMQ

3.1 导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.0.RELEASE</version></parent><groupId>cn.itcast.rocketmq</groupId><artifactId>itcast-rocketmq</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.5.2</version></dependency></dependencies><build><plugins><!-- java编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build>
</project>

说明:rocketmq-spring-boot-starter的依赖包是不能直接从中央仓库下载的,需要自己通过源码install到本地仓库的。
#源码地址(或者使用资料中的源码)
https://github.com/apache/rocketmq-spring
#进入源码目录,执行如下命令
mvn clean install

3.2 编写application.properties配置文件

# Spring boot application
spring.application.name = itcast-rocketmq
spring.rocketmq.nameServer=192.16.55.185:9876
spring.rocketmq.producer.group=my-group

3.3 生产者

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SpringProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送消息** @param topic* @param msg*/public void sendMsg(String topic, String msg){this.rocketMQTemplate.convertAndSend(topic, msg);}
}

3.4 消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "my-topic",consumerGroup = "my-consumer",selectorExpression = "*")
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {System.out.println("接收到消息 -> " + msg);}
}

3.5 编写启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MyApplication {public static void main(String[] args) {SpringApplication.run(MyApplication.class, args);}
}

3.6 测试代码

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {@Autowiredprivate SpringProducer springProducer;@Testpublic void testSendMsg(){this.springProducer.sendMsg("my-topic", "第一个Spring消息");}
}

3.7 事务消息

定义TransactionListenerImpl

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import java.util.HashMap;
import java.util.Map;
@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new
HashMap<>();@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message,
Object o) {String transId =
(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try {System.out.println("执行操作1");Thread.sleep(500);System.out.println("执行操作2");Thread.sleep(800);STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();}STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.ROLLBACK;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String transId =
(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);System.out.println("回查消息 -> transId = " + transId + ", state = " +
STATE_MAP.get(transId));return STATE_MAP.get(transId);}
}

生产者

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Component
public class SpringTransactionProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送消息** @param topic* @param msg*/public void sendMsg(String topic, String msg) {Message message = MessageBuilder.withPayload(msg).build();// myTransactionGroup要和@RocketMQTransactionListener(txProducerGroup =
"myTransactionGroup")定义的一致this.rocketMQTemplate.sendMessageInTransaction("myTransactionGroup",topic,message,null);System.out.println("发送消息成功");}
}

消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "my-topic",consumerGroup = "my-consumer",selectorExpression = "*")
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {System.out.println("接收到消息 -> " + msg);}
}

测试用例

import cn.itcast.rocketmq.spring.transaction.SpringTransactionProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {@Autowiredprivate SpringProducer springProducer;@Autowiredprivate SpringTransactionProducer springTransactionProducer;@Testpublic void testSendMsg(){this.springProducer.sendMsg("my-topic", "第2个Spring消息");}@Testpublic void testSendMsg2(){this.springTransactionProducer.sendMsg("my-topic", "第3个Spring消息");}
}

RocketMQ重试策略及与Springboot整合相关推荐

  1. RocketMQ单机环境搭建测试+springboot整合

    1.资源下载 官网:下载 | RocketMQ 这里选择使用编译后可以直接用的 下载后解压:略 2.更改配置 主要是更改 conf/broker.conf 的配置,记得添加上下面这几行,否则消息发送失 ...

  2. Springboot整合RocketMQ实战

    本文来说下Springboot如何整合RocketMQ. 文章目录 概述 Springboot整合RocketMQ 引入pom依赖 yaml文件 简单实例 本文小结 概述 消息队列rocketmq是A ...

  3. springboot整合rocketMQ记录 实现发送普通消息,延时消息

    一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例) 首先RocketMQ是阿里巴巴自研出来的,也已开源.其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂 ...

  4. Springboot整合Rocketmq系列教程

    Springboot整合Rocketmq系列教程 本教程是基于Springboot2.6.3整合Rocketmq5.0,其中涉及了Rocketmq的安装,消息的发送及消费的代码实现. 本文不会对roc ...

  5. kafka 安装使用 /springboot整合kafka /消息投递机制以及存储策略 /副本处理机制

    一.背景 1.基本信息 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流 ...

  6. springboot整合使用rocketMq

    前文,我们讲述了rocketMq的基本使用,接下来聊聊如何使用springboot整合使用rocketMq; 1)新建maven工程,工程结构目录如图: constants包下存放着常量信息,这里保存 ...

  7. Springboot 整合Retry 实现重试机制

    重试,在项目需求中是非常常见的,例如遇到网络波动等,要求某个接口或者是方法可以最多/最少调用几次: 实现重试机制,非得用Retry这个重试框架吗?那肯定不是,相信很多伙伴手写一下控制流程的逻辑也可以达 ...

  8. springboot整合redis实现HyperLogLog统计文章浏览量使用过期策略完成数据库同步

    springboot整合redis实现HyperLogLog统计文章浏览量&&使用过期策略完成数据库同步 本文目录 springboot整合redis实现HyperLogLog统计文章 ...

  9. RabbitMQ消息队列讲解(涵盖生产者消费者以及Springboot整合)

    文章目录 什么是消息队列 为什么用消息队列 常见MQ产品 RabbitMQ 特性 下载与安装 erlang RabbitMQ 工作原理 工作流程 消息模型 基本消息模型 生产者 消费者 消息确认机制( ...

最新文章

  1. 正则表达式简介及在C++11中的简单使用
  2. linux nice线程,linux nice 线程
  3. Python使用aiohttp异步爬取糗事百科
  4. Scrapy保存到txt文件或者数据库里面
  5. 知乎高赞:看懂这个颠覆世界观的认知,远比做1000道题更有用!
  6. 【超详细教程】如何使用TypeScript和GraphQL开发应用
  7. 对象入参指定泛型类型_为什么要使用泛型,而不是直接将类型作为参数传递?
  8. 双系统安装ubuntu后没有windows启动项
  9. C/C++ OpenCV五种滤波器综合示例
  10. jenkins 下载插件 一直失败_实用测试技能分享:jmeter+Jenkins性能测试自动化搭建...
  11. [leetcode周赛] 1349. 参加考试的最大学生数
  12. Jquery第二章appendTo方法到方法的使用练习第二节
  13. 程序员应该收藏哪些资讯类网站
  14. android 实现应用程序后台运行的说明
  15. 学生选课系统简单说明
  16. 城市道路智慧照明服务认证的流程及作用
  17. 因子分析(FA)算法简述
  18. Linux 30岁了,你知道吗?
  19. iPhone全系列进入恢复模式方法
  20. 这些华为技巧,花粉都不一定全知道

热门文章

  1. 云仓代发货到底是什么?
  2. Trinity安装与报错
  3. TTL转RS485电路(自动收发)---分享原理图及参考资料
  4. Goroutines和线程对比
  5. php多线程原子操作,C语言线程互斥和原子操作
  6. C语言中pthread或Windows API在多线程编程中的基本应用
  7. php输入框里的提示文字,h5和css3制作带提示文字的输入框
  8. 第5章 虚拟存储器
  9. IE8的调试工具使用详解(下)
  10. NumPy库回顾与分享(一)