本章的重点是可靠性,解决如何让消息队列满足业务逻辑需求,同时稳定可靠的长期运行。

顺序消息

顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货这三个消息必须按照顺序处理才行。顺序消息分为全局顺序消息和部分顺序消息,全局顺序消息是指某个topic下的所有消息都必须保证吮吸行;部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单,只要保证同一个订单的id的三个消息能够顺序消费即可。

全局顺序消息 

RocketMQ 在默认情况下不保证顺序,比如创建一个topic,默认8个写队列,8个读队列。这时候一条消息可能被写入任意一个队列;在数据读取的过程中,可能有多个consumer,每个consumer也可能启动多个线程并发处理,所以消息被哪个消费者消费,被消费的顺序和写入消息的顺序一致性是不确定的。

要保证全局顺序消息,需要先把topic的读写队列数设置为1,然后生产者和消费者的并发数也设置为1,(我这个项目默认是20个线程并发),简单来说,为了保证整个topic的全局消息有序,只能消除所有的并发处理,各部分都设置为单线程处理,这时高并发、高吞吐的功能基本用不上了。

部分顺序消息

要保证部分消息有序,需要发送端和消费端配合处理。在发送端要做到把同一个业务id的消息发送到同一个Message Queue;在消费过程中要做到从同一个消息队列读取的消息不能被并发处理,这样才能达到部分有序。

发送端使用MessageQueueSelector 来控制消息往哪个队列中发送,如下代码:我这里默认的是所有消息都往0队列发送消息,当然也可以自己去负载均衡策略看你自己

package rocketmq.day05;import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;/*** @author heian* @create 2020-01-13-4:34 下午* @description 消息发送到同一个消息队列*/
public class MessageQueueSelectorDemo {public void sendMsgDemo() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("unique_producer_group__name");producer.setInstanceName("instance1");producer.setRetryTimesWhenSendFailed(3);//重试次数 192.168.138.47  192.168.142.133  192.168.0.102producer.setNamesrvAddr("192.168.139.188:9876");//多个用;分割 192.168.138.47producer.start();for (int i = 0; i < 4; i++) {int orderId = i;//消息发送的顺序Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat();String format = sdf.format(date);System.out.println("准备发送:" + format);Message message = new Message("topicName", String.valueOf(i),format.getBytes());SendResult sendResult= new SendResult();// sendResult = producer.send(message);无需指定队列sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id%mqs.size();//return mqs.get(index);//负载均衡策略return mqs.get(0);//指定某个队列}},orderId);System.out.println("key:"+i + "消息的发送结果为:" + sendResult.toString() + "消息ID为:" + sendResult.getMsgId());}producer.shutdown();}public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {MessageQueueSelectorDemo demo = new MessageQueueSelectorDemo();demo.sendMsgDemo();}
}

消费端则是用MessageListenerOrderly 来解决单消息队列的被并发处理的问题:

package rocketmq.day05;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;import java.io.UnsupportedEncodingException;/*** @author heian* @create 2019-12-09-8:12 上午* @description*/
public class MessageQueueSelectorConsume {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_consume_group_name");consumer.setNamesrvAddr("192.168.139.188:9876");//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setMessageModel(MessageModel.CLUSTERING);//默认是集群模式consumer.subscribe("topicName",null);//MessageListenerConcurrently 并发处理消息 监听类consumer.registerMessageListener((MessageListenerOrderly) (listMsg, consumeOrderlyContext) -> {byte[] body = listMsg.get(0).getBody();try {String ms = new String(body,"utf-8");System.out.println(Thread.currentThread().getName()+"收到消息:" + ms);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;//ConsumeConcurrentlyStatus.CONSUME_SUCCESS});consumer.start();}}

生产端负载均衡(消息发送的不是同一个队列)

--发送端
key:0消息的发送结果为:SendResult [sendStatus=SEND_OK, msgId=C0A88BBC00002A9F00000000000727BA, messageQueue=MessageQueue [topic=topicName, brokerName=bogon, queueId=0], queueOffset=55]消息ID为:C0A88BBC00002A9F00000000000727BA
key:1消息的发送结果为:SendResult [sendStatus=SEND_OK, msgId=C0A88BBC00002A9F000000000007285A, messageQueue=MessageQueue [topic=topicName, brokerName=bogon, queueId=1], queueOffset=17]消息ID为:C0A88BBC00002A9F000000000007285A
key:2消息的发送结果为:SendResult [sendStatus=SEND_OK, msgId=C0A88BBC00002A9F00000000000728FA, messageQueue=MessageQueue [topic=topicName, brokerName=bogon, queueId=2], queueOffset=13]消息ID为:C0A88BBC00002A9F00000000000728FA
key:3消息的发送结果为:SendResult [sendStatus=SEND_OK, msgId=C0A88BBC00002A9F000000000007299A, messageQueue=MessageQueue [topic=topicName, brokerName=bogon, queueId=3], queueOffset=13]消息ID为:C0A88BBC00002A9F000000000007299A--消费端 设置消费消费线程数
ConsumeMessageThread_13收到消息:20-1-13 下午5:47:3
ConsumeMessageThread_14收到消息:20-1-13 下午5:47:2
ConsumeMessageThread_15收到消息:20-1-13 下午5:47:1
ConsumeMessageThread_16收到消息:20-1-13 下午5:47:0

生产端指定队列(消息发送的是同一个队列)

--发送端
key:0消息的发送结果为:SendResult [sendStatus=SEND_OK, msgId=C0A88BBC00002A9F0000000000072A3A, messageQueue=MessageQueue [topic=topicName, brokerName=bogon, queueId=0], queueOffset=56]消息ID为:C0A88BBC00002A9F0000000000072A3A
key:1消息的发送结果为:SendResult [sendStatus=SEND_OK, msgId=C0A88BBC00002A9F0000000000072ADA, messageQueue=MessageQueue [topic=topicName, brokerName=bogon, queueId=0], queueOffset=57]消息ID为:C0A88BBC00002A9F0000000000072ADA
key:2消息的发送结果为:SendResult [sendStatus=SEND_OK, msgId=C0A88BBC00002A9F0000000000072B7A, messageQueue=MessageQueue [topic=topicName, brokerName=bogon, queueId=0], queueOffset=58]消息ID为:C0A88BBC00002A9F0000000000072B7A
key:3消息的发送结果为:SendResult [sendStatus=SEND_OK, msgId=C0A88BBC00002A9F0000000000072C1A, messageQueue=MessageQueue [topic=topicName, brokerName=bogon, queueId=0], queueOffset=59]消息ID为:C0A88BBC00002A9F0000000000072C1A--消费端
ConsumeMessageThread_2收到消息:20-1-13 下午5:57:0
ConsumeMessageThread_2收到消息:20-1-13 下午5:57:1
ConsumeMessageThread_2收到消息:20-1-13 下午5:57:2
ConsumeMessageThread_2收到消息:20-1-13 下午5:57:3

消费端使 MessageListenerOrderly的时候,下面四个consumer的设置依旧可以使用:setConsumeThreadMin    setConsumeThreadMax  setPullBatchSize    setConsumeMessageBatchMaxSize  前两个参数设置消费线程的线程数,PullBatchSize指的是一次从broker的一个消息队列获取消息的最大数,默认是32,ConsumeMessageBatchMaxSize指的是这个consumer的Executor(也就是调用MessageListener处理的地方) 一次传入的消息数(List<MessageExt> msgs这个链表的最大长度),默认值是1。

指的说明的是MessageListenOrderly并不是简单的禁止并发处理,在MessageListenOrderly的实现中,为每个消费队列加了个锁,消费每个消息前,需要先获得这个消息对应的锁,这样保证了同一时间同一消费队列的消息并不是并发消费,但不同的消费队列可以并发消费,这也就是为什么不用队列并没有顺序消费的原因。

消费重复问题

对于分布式消息队列来说,同时做到一定投递和不重复投递是很难的,在鱼和熊掌不可兼得的情况下,rocketmq选择了一定投递,保证消息不会丢失。发送时消息重复,还有一种是重复投递:消费者接收到消息后消费完给服务端应答,网络波动,为了保证消息至少被消费一次,会在网络恢复后再再次发送。就可能造成重复消费。解决消息重复消费有两种方法:

第一是客户端做消息幂等:就是消费一次跟消费几次不影响业务结果。

第二种则是自己维护一个消费记录:你要消费某一条消息,因为消息存在唯一key,将消费的消息插入到数据库。你可以加个消费状态码,00未消费 01消费中  02消费完成,消费前采取分布式锁锁住,再去查询下这条消息是否被消费过,然后再去看则条消息要不要丢弃。当然这些都是要使用者自己去实现的。

动态增减机器

一个消息队列集群由多台机器组成,持续稳定的提供服务,因为业务需求或硬件故障,经常需要增加或减少角色机器,下面介绍下如何在不影响服务器的情况下动态的增减机器。

动态增减NameServer

        NameServer是rocketmq进群的协调者,集群的各个组件是通过NameServer获取各种属性和地址信息的,主要功能分为两部分:各个broker定期上报自己的状态信息到NameServer;另一个是各个客户端,也就是我们项目中的生产者、消费者、以及命令行工具,通过NameServer获取最新的状态信息。所以在启动broker、生产者和消费者之前必须告诉它们NameServer的地址,为了提高可靠性,建议启动多个NameServer,因为其占用的资源不多,可以和broker部署在同一台机器。有了多个NameServer后,减少某个NameServer对其它组件无影响。

有四种方式去设置NameServer地址,下面按照优先级从高到低的方式依次介绍:

  1. 通过代码设置,比如生产者中通过 producer.setNamesrvAddr("ip1:port1;ip2:potr2");在mqadmin工具是通过 -n ip1:port1;ip2:port2 参数进行设置,如果自定义了命令工具,也可以通过defaultMQAdminExt.setNamesrvAddr("ip1:port1;ip2:port2")来设置
  2. 使用java  启动参数设置,对应的option 是 rocketmq.namesrv.addr
  3. 通过Linux环境变量参数这种,在启动前设置变量:NAMESRV_ADDR
  4. 通过http服务来设置,当上述方法都没有使用,程序会想一个http地址发送请求来获取NameServer地址,默认的URL是http://jmenv.tbsite.net.8080/rocketmq/nsaddr(淘宝测试地址),通过rocketmq.namesrv.domain 参数来覆盖jmenv.tbsite.net;通过rocketmq.namesrv.domain.subgroup参数来覆盖sadder

第四种方式看似繁琐,但它是唯一支持动态增加NameServer,无需重启其它组件。使用这种方式其它组件会每个2分钟请求一次URL,获取最新的NameServer地址。

动态增减broker

由于业务增长,需要对集群进行扩容的时候,可以动态增加broker角色的机器。只增加broker不会对原有的topic产生影响,原来创建好的topic中数据的读写依然在原来的那些broker上进行。

集群扩容后,一来是可以把新建的topic指定到新的broker机器上,均衡利用资源,令一种方式是通过updateTopic命令更新现有的topic配置,在新的broker上创建新的队列。比如TestTopic是现有的一个topic主题,因为数据量加大的原因需要扩容,新增加的broker机器地址为:192.168.0.1:10911,这个时候执行下面的的命令:结果就是在新建的broker机器上为TestTopic新创建了8个读写队列

sh ./bin/mqadmin updateTopic -b 192.168.0:10911 -t TestTopic -n 192.168.0.100:9876

因为业务变动或者置换机器需要减少broker,此时该如何操作呢?减少broker要看是否有持续运行的生产者,当一个topic只有一个master broker,停止掉这个broker后,消息发送肯定会受到影响,需要在停止这个broker前,停止发送消息。

当某个topic有多个master broker,停止其中的一个,这时候是否会丢失消息呢?答案和生产者使用发送消息的方式有关系,如果使用同步的方式send(msg),在DefaultMQProducer内部有个自动重试的逻辑,其中一个broker停了,会自动向另一个broker发送消息,不会发生丢消息的现象。如果使用异步方式发送消息send(msg,callback),或者用sendOneWay方式,会丢失切换过程中的消息。因为在异步和sendOneWay两种方式下,Producer.setRentryTimesWhenSendFailed设置不起作用,发送失败不会重试。DefaultMQProducer默认30秒到NameServer请求最新的路由消息,生产者如果获取不到已停止的broker下的队列信息,后续就自动不再向这些队列发送消息。

如果生产者程序能够暂停,在有一个master和slave的情况下也可以顺利切换。可以关闭生产者后关闭master broker,这时候所有的消息读取都会到slave机器,消息消费不受影响(无法写消息到备机),在把master broker 置换完成后,基于原来的数据启动这个master broker,然后生产者在启动正常发送消息。

用Linux的kill pid命令就可以正确关闭broker,BrokerController下有个shutdown函数,这个函数被加到了ShutdownHook里,当用Linux的kill命令(不能用kill -9),shutdown函数会先被执行,也可以通过RocketMQ 提供的mqadmin工具来关闭broker,它们的原理是一样的。

各种故障对消息的影响

我们希望消息队列集群可以一直稳定的运行,但有时候故障总是难免的,下面就是可能会出现的故障情况,看如何处理

1、broker正常关闭,再重启

这种情况属于可控的软件问题,内存中的数据不会丢失,会自动主备切换。如果重启过程中有持续运行的消费者,master broker故障后,消费端会自动连接到对应的slave机器,不会有消息丢失和偏差。当master broker机器重启以后,消费端又回重新连接到master broker。但是需要注意的是:在启动master broker的时候,消费端正在从slave broker消费消息,不要强行停止消费端消费,如果停止了消费端消费,启动master broker后这个时候在启动消费者,此时它就会去读master机器上已经滞后的offset 值,那么之前消费过的消息又回重新再次消费一遍造成重复消费。

如果第一种情况喜爱还有持续运行的生产者,一台master broker故障后,生产者如果采用同步方式发送消息,会进行重试会向topic下其它的master机器发送消息, 如果采用异步方式不会进行重试,会丢失切换过程中的消息,因为它每隔30秒到name server获取路由信息,如果获取不到已经停止的master broker下的队列信息,后续就不会向此队列发送消息。

2、broker 异常crash,然后启动;os crash,重启;机器断电,但能马上恢复供电;

处于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同。如果master、slave都配置成SYNC_FLUSH可以达到和第一种相同的情况,然后是异步刷盘则会出现消息丢失。

3、磁盘损坏;cpu、主板、内存等关键设备损坏

属于硬件故障,原来的机器磁盘数据可能丢失,如果master和slave机器配置成同步复制的方式,某一台机器发生上述故障,消息不会丢失,如果是异步复制,消息自然也就无法同步了,那么此时复制时候的消息会丢失。

总的来说,主从为了保证高可用主从最好设置为同步复制的方式,生产者发消息最好是同步方式去写,刷盘也可以采取同步刷盘(效率较低)就可以消除单点故障,即使某台设备出现极端情况也不会消息丢失!

消息优先级

有些场景下应用程序需要处理几种类型的消息,不同消息的优先等级不同。rocketmq是个先入先出的队列,不支持消息级别或者topic优先级别。业务中简单的优先级需求可以通过间接的方式去解决。

第一种比较简单的情况,如果当topic里面有多种相似类型的消息,比如AB、AC 、AA,当AB、AC的消息很大,但是处理的消息比较慢的时候,队列里面会有很多AB、AC类型的消息在等待处理,这个时候如果有少量的AA加入到队列,就会排在AB、AC类型消息后面,需要等待很长的时间才会被处理(先进先出fifo)。如果业务需要把AA类型的消息被及时处理,可以把相似类型的消息分别拆到两个topic里,比如AA类型的消息放在topicaA中,AB、AC放在topicbB中,然后起两个消费端分别订阅两个topic,不会因为AB、AC类型消息太多而被延迟处理。

第二种情况和第一种类似,但是不用创建大量的topic,举个实际应用场景:一个订单系统,接受100家快递门店过来的请求,把这些请求通过生产者写入到mq,订单处理程序通过消费者从消息队列中读取并处理消息,每天最多处理1万单,如果这100个快递门店的某几家业务剧增,比如一个门店就发出2万个订单请求,这样其它99家门店可能被迫等这一家门店处理完没,也就是后两天的订单才能处理,这显然是不公平的。这时候可以创建一个topic,设置这个topic的消息队列数为100,对应这100家门店,生产者写入消息指定对应的队列即可,而消费者端DefaultMQConsumer默认是采用循环的方式读取一个topic下的所有队列,虽然业务量剧增的门店消息处理不咋变,但是不影响其它门店的正常处理了。DefaultMQConsumer默认的PullBatchSize是32,也就是每次从读取消息的时候最多可以读32个,为了更加公平可以把这个参数设置为1consume.setPullBatchSize(1),但是吞吐量就低了。

第三种情况是强优先级需求,上面两种情况是对消息的优先级不高,更像一个保证公平机制的处理,避免某类消息因量较大而阻塞其它消息的处理。现存在一个应用程序同时处理a,b,c三类消息,a处于第一优先级,b,c为第二第三优先级。对于这种则要求用户自己用编码去实现优先级的控制。如果上述三类消息在一个topic里面,可以使用拉去的pull的模式,自主控制消息队列MessageQueue的遍历以消息的读取;如果消息在三个topic里面,则需要启动三个消费这,实现控制三个消费端的消费优先级。

RocketMQ可靠性优先使用场景相关推荐

  1. mysql可靠性优先策略

    在双 M 结构下,从状态 1 到状态 2 切换的详细过程是这样的: 1判断备库 B 现在的 seconds_behind_master,如果小于某个值(比如 5 秒)继续下一步,否则持续重试这一步: ...

  2. RocketMQ读书笔记7——吞吐量优先的场景

    [Broker端进行消息过滤] 在Broker端进行消息过滤,可以减少无效消息发送到Consumer,少占用网络宽带从而提高吞吐量. [过滤方式1--通过Tag过滤] [ 关于Tag和Key ] 对一 ...

  3. RocketMQ和Kafka应用场景与选型

    1.适用场景 kafka适合日志处理 rocketmq适合业务处理 结论:两者没有区别,根据具体业务定夺 2.性能 kafka单机写入TPS号称在百万条/秒 rocketmq大约在10万条/秒 结论: ...

  4. RocketMQ消息乱序场景及解决方法

    消息乱序也是RocketMQ中的一个常见问题,那么到底为什么会出现消息乱序呢? 首先我们知道在RocketMQ的Topic中,会有多个MessageQueue作为数据分片,每个MessageQueue ...

  5. RocketMQ消息重复消费场景及解决办法

    消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等. 那么在什么情况下会发生RocketMQ的消息重复消费呢? 当系统的调用链路比较长的 ...

  6. 面渣逆袭:RocketMQ二十三问

    基础 1.为什么要使用消息队列呢? 消息队列主要有三大用途,我们拿一个电商系统的下单举例: 解耦:引入消息队列之前,下单完成之后,需要订单服务去调用库存服务减库存,调用营销服务加营销数据--引入消息队 ...

  7. 招聘季,面试前知道RocketMQ这二十三点,大厂面试稳了

    基础 1.为什么要使用消息队列呢? 消息队列主要有三大用途,我们拿一个电商系统的下单举例: 解耦:引入消息队列之前,下单完成之后,需要订单服务去调用库存服务减库存,调用营销服务加营销数据--引入消息队 ...

  8. 两万字、三十图、二十三问,学会RocketMQ

    基础 1.为什么要使用消息队列呢? 消息队列主要有三大用途,我们拿一个电商系统的下单举例: 解耦:引入消息队列之前,下单完成之后,需要订单服务去调用库存服务减库存,调用营销服务加营销数据--引入消息队 ...

  9. 2023年郑州春招3年开发面试总结

    MySQL必备知识 MySQL索引结构 介绍B树结构 首先,常规的数据库存储引擎,一般都是采用 B 树或者 B+树来实现索引的存储. 因为 B 树是一种多路平衡树,用这种存储结构来存储大量数据,它的整 ...

最新文章

  1. vba excel 开发游戏_为什么要学习VBA?
  2. python简单代码演示效果-10分钟教你用python 30行代码搞定简单手写识别!
  3. 使用markdown编辑evernote(印象笔记)的常用方法汇总
  4. HTML5新特性介绍
  5. tf.nn.conv2d 与tf.layers.conv2d的区别
  6. 弗吉尼亚理工大学(Virginia Tech)NCR校区招收计算机硕士学位研究生
  7. java 语法 冒号_java中生僻的冒号跳转语法
  8. Spring工作原理分析
  9. (十七)WebGIS中距离及面积测量的原理和实现以及坐标转换的简单介绍
  10. verilog设置24进制计数器_阅读笔记:《Verilog HDL入门》第3章 Verilog语言要素
  11. 计算机应用基础问题,计算机应用基础常见问题
  12. js在wap端获取定位_两款JS脚本判断手机浏览器类型跳转WAP手机网站
  13. android tabhost用法详解,android Tabhost部件详解
  14. matlab绘三维椭球面_MATLAB绘制三维图形
  15. [转载] 晓说——第29期:海上霸主航母(上)
  16. 2021-2-18:请你说说MySQL的字符集与排序规则对开发有哪些影响?
  17. 间歇性需求预测之Croston‘s method
  18. open source软件:Nomad介绍(任务编排工具)
  19. latex行内公式和行间公式
  20. android获取用户手机信息,Android – 使用AccountManager /手机所有者的姓氏和姓氏获取用户数据...

热门文章

  1. 【童晶老师《Python游戏趣味编程》在PyCharm中编辑】
  2. Android应用记录一:有道翻译API调用
  3. php float 向下取整,php怎么实现向下取整
  4. 【css】CSS3有哪些新特性
  5. 卓训教育:面对孩子,家长如何管理好自己的情绪?
  6. html5触屏滑动事件,HTML5的touch事件详解
  7. vscode配置用户代码片段(快捷键自定义代码模板)
  8. python如何实现任务超时处理?
  9. 关于钓鱼攻击和防范这些事
  10. 【MySQL】关于MySQL配置文件路径问题