rocketmq 消息 自定义_RocketMQ的消息发送及消费
RocketMQ消息支持的模式:
消息支持的模式分为三种:NormalProducer(普通同步),消息异步发送,OneWay。
消息同步发送:
普通消息的发送和接收在前面已经演示过了,在前面的案例中是基于同步消息发送模式。也就是说消息发送出去后,producer会等到broker回应后才能继续发送下一个消息.
消息异步发送:
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务器响应即可返回,进行第二条消息发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
producer.send(msg, newSendCallback() {
@Overridepublic voidonSuccess(SendResult sendResult) {
System.out.printf("%s%n",sendResult);
}
@Overridepublic voidonException(Throwable throwable) {
throwable.printStackTrace();
}
});
OneWay:
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答.效率最高。
producer.sendOneway(msg);
MessageListenerOrderly(顺序消费):
在学习kafka的时候我们知道了,消息可以通过自定义分区策略来实现消息的顺序发送,实现原理就是把同一类消息都发送到相同的分区上。在RocketMQ中,是基于多个Message Queue来实现类似于kafka的分区效果。如果一个Topic 要发送和接收的数据量非常大, 需要能支持增加并行处理的机器来提高处理速度,这时候一个Topic 可以根据需求设置一个或多个Message Queue。Topic 有了多个Message Queue 后,消息可以并行地向各个Message Queue 发送,消费者也可以并行地从多个Message Queue 读取消息并消费。要了解RocketMQ消息的顺序消费,我们先对RocketMQ的整体架构进行了解。
RocketMQ消息发送及消费的基本原理:
这是一个比较宏观的部署架构图,rocketmq天然支持高可用,它可以支持多主多从的部署架构,这也是和kafka最大的区别之一。原因是RocketMQ中并没有master选举功能,所以通过配置多个master节点来保证rocketMQ的高可用。和所有的集群角色定位一样,master节点负责接受事务请求、slave节点只负责接收读请求,并且接收master同步过来的数据和slave保持一直。当master挂了以后,如果当前rocketmq是一主多从,就意味着无法接受发送端的消息,但是消费者仍然能够继续消费。所以配置多个主节点后,可以保证当其中一个master节点挂了,另外一个master节点仍然能够对外提供消息发送服务。
当存在多个主节点时,一条消息只会发送到其中一个主节点,rocketmq对于多个master节点的消息发送,会做负载均衡,使得消息可以平衡的发送到多个master节点上。一个消费者可以同时消费多个master节点上的消息,在下面这个架构图中,两个master节点恰好可以平均分发到两个消费者上,如果此时只有一个消费者,那么这个消费者会消费两个master节点的数据。由于每个master可以配置多个slave,所以如果其中一个master挂了,消息仍然可以被消费者从slave节点消费到。可以完美的实现rocketmq消息的高可用。
站在topic的角度来看看消息是如何分发和处理的,假设有两个master节点的集群,创建了一个TestTopic,并且对这个topic创建了两个队列(可以通过producer进行设置producer.setDefaultTopicQueueNums(2),默认是4),也就是分区。消费者定义了两个分组,分组的概念也是和kafka一样,通过分组可以实现消息的广播。
自定义消息发送规则:
通过自定义发送策略来实现消息只发送到同一个队列因为一个Topic 会有多个Message Queue ,如果使用Producer 的默认配置,这个Producer 会轮流向各个Message Queue 发送消息。Consumer 在消费消息的时候,会根据负载均衡策略,消费被分配到的Message Queue如果不经过特定的设置,某条消息被发往哪个Message Queue ,被哪个Consumer 消费是未知的如果业务需要我们把消息发送到指定的Message Queue 里,比如把同一类型的消息都发往相同的Message Queue。那是不是可以实现顺序消息的功能呢?
和kafka一样,rocketMQ也提供了消息路由的功能,我们可以自定义消息分发策略,可以实现MessageQueueSelector,来实现自己的消息分发策略
SendResult sendResult=producer.send(msg, newMessageQueueSelector() {
@Overridepublic MessageQueue select(Listlist, Message message, Object o) {int key=o.hashCode();int size =list.size();int index = key%size;return list.get(index);//list.get(0);
}
},"key_"+i);
在消息分发的时候如果消息发送到topic多个MessageQueue,假设设置2个写队列以及2个读队列,如果读和写队列不一致,会存在消息无法消费到的问题,如果消费队列为2,启动一个消费者,那么这个消费者会消费者两个队列,如果两个消费者消费这个队列,那么意味着消息会均衡分摊到这两个消费者中,如果消费者数大于readQueueNumbs,那么会有一些消费者消费不到消息,浪费资源。
消息的顺序消费:
首先,需要保证顺序的消息要发送到同一个messagequeue中;其次,一个messagequeue只能被一个消费者消费,这点是由消息队列的分配机制来保证的;最后,一个消费者内部对一个mq的消费要保证是有序的。我们要做到生产者 - messagequeue - 消费者之间是一对一对一的关系。
通过分区规则可以实现同类消息在rocketmq上的顺序存储。但是对于消费端来说,如何保证消费的顺序?我们前面写的消息消费代码使用的是MessageListenerConcurrently并发监听,也就是基于多个线程并行来消费消息。这个无法保证消息消费的顺序。RocketMQ中提供了MessageListenerOrderly 一个类来实现顺序消费
consumer.registerMessageListener(newMessageListenerOrderly() {
@Overridepublic ConsumeOrderlyStatus consumeMessage(Listlist, ConsumeOrderlyContext consumeOrderlyContext) {
MessageExt messageExt=list.get(0);
if(messageExt.getReconsumeTimes()==3){ //消息重发了三次//持久化 消息记录表
return ConsumeOrderlyStatus.SUCCESS; //签收
}return ConsumeOrderlyStatus.SUCCESS; //签收
}
});
顺序消费会带来一些问题,
遇到消息失败的消息,无法跳过,当前队列消费暂停
降低了消息处理的性能
消费端的负载均衡:
和kafka一样,消费端也会针对Message Queue做负载均衡,使得每个消费者能够合理的消费多个分区的消息。
消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载
消费端遍历自己的所有topic,依次调rebalanceByTopic
根据topic获取此topic下的所有queue
选择一台broker获取基于group的所有消费端(有心跳向所有broker注册客户端信息)
选择队列分配策略实例AllocateMessageQueueStrategy执行分配算法
什么时候触发负载均衡:
消费者启动之后
消费者数量发生变更
每10秒会触发检查一次rebalance
分配算法,RocketMQ提供了6中分区的分配算法:
AllocateMessageQueueAveragely :平均分配算法(默认)
AllocateMessageQueueAveragelyByCircle:环状分配消息队列
AllocateMessageQueueByConfig:按照配置来分配队列: 根据用户指定的配置来进行负载
AllocateMessageQueueByMachineRoom:按照指定机房来配置队列
AllocateMachineRoomNearby:按照就近机房来配置队列:
AllocateMessageQueueConsistentHash:一致性hash,根据消费者的cid进行
消息的的可靠性原则:
在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者一次性正常消费成功,可能会存在需要多次消费才能成功或者一直消费失败的情况,那作为发送者该做如何处理呢?
消息消费端的确认机制:RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功
consumer.registerMessageListener(newMessageListenerConcurrently() {
@Overridepublic ConsumeConcurrentlyStatus consumeMessage(Listmsgs,ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回消息消费状态
}
});
所有消费者在设置监听的时候会提供一个回调,业务实现消费回调的时候,当回调方法中返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。
消息的衰减重试:
为了保证消息肯定至少被消费一次,RocketMQ会把这批消息重新发回到broker,在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。可以修改broker-a.conf文件messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 。一般情况下我们在实际生产中是不需要重试16次,这样既浪费时间又浪费性能,理论上当尝试重复次数达到我们想要的结果时如果还是消费失败,那么我们需要将对应的消息进行记录,并且结束重复尝试。
consumer.registerMessageListener((MessageListenerConcurrently) (list,
consumeOrderlyContext)->{for(MessageExt messageExt : list) {if(messageExt.getReconsumeTimes()==3) {//可以将对应的数据保存到数据库,以便人工干预
System.out.println(messageExt.getMsgId()+","+messageExt.getBody());returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
RocketMQ中的延迟消息
开源RocketMQ支持延迟消息,但是不支持秒级精度。默认支持18个level的延迟消息,这是通过broker端的messageDelayLevel配置项确定的,如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。延迟级别的值可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持2天的延迟,修改最后一个level的值为2d,这个时候依然是18个level;也可以增加一个2d,这个时候总共就有19个level。
rocketmq 消息 自定义_RocketMQ的消息发送及消费相关推荐
- rocketmq 消息 自定义_RocketMQ消息轨迹-设计篇
RocketMQ 消息轨迹主要包含两篇文章:设计篇与源码分析篇,本节将详细介绍RocketMQ消息轨迹-设计相关. RocketMQ消息轨迹,主要跟踪消息发送.消息消费的轨迹,即详细记录消息各个处理环 ...
- tcp前4字节消息长度_RocketMQ的消息存储格式
总体代码 我们可以通过阅读RocketMQ的消息存储代码来了解RocketMQ的消息存储格式,消息的存储入口是DefaultMessageStore,我们可以通过DefaultMessageStore ...
- rocketmq 消息 自定义_rocketmq中的自定义消息头
在springboot中使用rocketmq的客户端,有2种方式. 一是使用 org.apache.rocketmq的rocketmq-client 二是,在pom.xml中引用 org.apache ...
- rocketmq存储结构_RocketMQ消息存储
存储架构 RMQ存储架构 上图即为RocketMQ的消息存储整体架构,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog,1G) ...
- rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring
博客地址:朝·闻·道www.wuwenliang.net 本文我将继续讲解如何使用DefaultMQPushConsumer对RocketMQ中的消息进行消费,同时在文章的第二部分将继续带领读者朋友 ...
- rocketmq延时消息自定义配置;topic下tag使用
概述 使用的是开源版本的rocketmq4.9.4 rocketmq也是支持延时消息的. rocketmq一般是4个部分: nameserver:保存路由信息 broker:保存消息 生产者:生产消息 ...
- rocketmq怎么保证消息一致性_RocketMQ为什么要保证订阅关系的一致性?
微信公众号「后端进阶」,专注后端技术分享:Java.Golang.WEB框架.分布式中间件.服务治理等等. 前段时间有个朋友向我提了一个问题,他说在搭建 RocketMQ 集群过程中遇到了关于消费订阅 ...
- 使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息
作者 | 辽天 来源 | 阿里巴巴云原生公众号 导读:本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 sp ...
- rocketmq订阅多个主题_RocketMQ 的消息模型
点击上方蓝字"废材姑娘"关注我, 让我们一起成长 摄影: 双儿 开篇 不知道大家跟我是不是有同样的困惑, 在学习不同的消息框架时都会接触到主题, 队列, 分区, 生产者, 消费者等 ...
最新文章
- 【译】CSS动画 vs JS动画
- 从 webpack 到全面拥抱 Parcel #1 探索 Parcel
- oracle时间相减得到天数_【数列】从错位相减到阿贝尔变换
- 字节跳动秋招超6000人,渣本双非的出路都被谁堵死了?
- sqlite java excel,Android将Excel表数据导入SQLite数据库
- Android -- Intent
- 关于css选择器的问题 + * ~这三个有什么区别
- [BZOJ 1012] 最大数maxnumber
- php中...的用法
- sql limit 子句_Java 8流中的常见SQL子句及其等效项
- 关于测试一个接口的面试题
- php与mysql网页实例,php与mysql 实例
- 《网管员必读——网络组建》(第2版)导读
- 微信模板消息字体设置变大
- Linux 下查看内存问题
- html img标签alt属性吗,img标签可以不用alt属性吗
- Go 为什么选择 Gopher 作为吉祥物?
- SD内存卡格式化后如何数据恢复教程
- 基于Java的qq截图工具设计开发(含源文件)
- 跨考计算机—努力换青春无悔(纪录篇)
热门文章
- html入门学习(二)
- 反置页表(1__操作系统)
- docker设置http_proxy https_proxy解决gcr.io/kaniko-project/executor:v1.7.0之类的镜像拉取问题
- 【收藏】Geomesa(三)图层的裁剪分析
- 【收藏】wiztree大文件查找软件
- Webpack安装及打包js、css文件示例
- k8s kubesphere安装在k8s中的基础环境准备(前提条件)
- arthas命令使用示例:monitor监视指定方法的执行情况
- solr中的ik分词器的原理是什么
- MySQL数据库事务的特性