RocketMQ基础提高进阶demo实例应用
目录
MQ可以做什么?
RocketMQ基础知识
RocketMQ核心组成
NameServer
Broker
Producer&Consumer
RocketMQ设计
RocketMQ优势
Spring Cloud Alibaba 中的RocketMQ
Spring cloud stream
Binder&Binding
Spring message
Spring Integration
从0开发RocketMQ
Producer&Consumer配置
顺序消息
事务消息
Spring cloud alibaba RocketMQ实例
Q&A
MQ可以做什么?
业务异步解耦
解耦微服务
流量削峰填谷
消息分发
分布式事务的数据一致性
主流的 MQ : RocketMQ、kafka、RabbitMQ
RocketMQ基础知识
java、分布式消息中间件。
支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型。
支持拉pull和推push两种消息模式。
PS:Push其底层是一个pull模式,它由一个业务代码从MQ中pull消息,再由业务代码push给特定的应用/消费者。
支持多种消息协议,如:JMS\MQTT等。
提供docker镜像用于隔离测试和云集群部署。
RocketMQ核心组成
NameServer、Broker、Producer以及Consumer四部分。
NameServer
RocketMQ 的服务注册中心,是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
作用:
用来保存 Broker 相关元信息并给 Producer 和 Consumer 查找Broker 信息。
每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到NameServer 获取到 Broker 的路由信息,进而和Broker取得连接。Consumer 也会定时获取 Topic 的路由信息。
主要包含两个功能:
1、Broker 管理,接收来自 Broker 集群的注册请求,提供心跳机制检测 Broker 是否存活;
2、路由管理,每个 NameServer 持有全部有关 Broker 集群和客户端请求队列的路由信息;
Broker
消息服务器,负责存储消息、转发消息。
PS:元数据,包括消费者组、消费进度偏移量、队列消息等。
Broker 有 Master 和 Slave 两种类型,Master 支持读&写,Slave仅支持读。
Master与Slave的对应关系通过指定相同的Broker Name,不同的BrokerId来定义,BrokerId为0表Master,非0表示Slave。
Broker定时(每隔30s)注册Topic信息到所有Name Server。
PS:部署方式灵活,单master,多master,多master多slave(异步复制,同步双写)
消息存储文件、消息消费队列文件与索引文件。
Producer&Consumer
Producer:消息发布者,负责生产并发送消息至 Topic(brokers)。
ProducerGroup:生产者组,业务上同组producer发送同类消息且发送逻辑一致,部署上则标记一个集群。
Consumer:消息订阅者,负责从 Topic(broker) 接收并消费消息。
ConsumerGroup:消费者组,消费同类消息并且消费逻辑一致。
PS:同一条消息,只能被某一消费组其中的一台机器消费,但是可以同时被不同的消费组消费。
发送的多种范例:
同步发送:发送消息后,接收到响应再发下一个消息。
异步发送:发送消息后,不等待响应,直接发下一个消息。
单向发送:只负责发送消息而不等待服务器回应且没有回调函数触发。适用例如日志收集等。
消费模式:集群消费(Clustering)和广播消费(Broadcasting)
消费顺序:顺序消费(Orderly)和并行消费(Concurrently)
RocketMQ设计
RocketMQ 的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一。
NameServer设计简单,集群间互不通信。Topic路由信息在集群间追求最终一致性。
高效的IO存储机制,消息发送的高吞吐量。RocketMQ消息存储文件设计成文件组的概念,组内单个文件大小固定,方便引入内存映射机制。所有主题的消息存储基于顺序写,提升写性能,同时为了兼顾消息消费与消息查找,引入了消息消费队列文件与索引文件。
设计缺陷:只保证消息被消费者消费,允许消息被重复消费,如果需要避免重复消费则需要在消费端自己实现。
RocketMQ优势
支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持);
支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提);
支持 18 个级别的延迟消息(Kafka 不支持)支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认);
支持 Consumer 端 Tag 过滤,减少不必要的网络传输(即过滤由MQ完成,而不是由消费者完成。RabbitMQ 和 Kafka 不支持);
支持重复消费(RabbitMQ 不支持,Kafka 支持);
Spring Cloud Alibaba 中的RocketMQ
Spring cloud stream
构建基于消息(spring message)的微服务应用框架,使用Spring Integration与 Broker 进行连接。
核心组件:Binder、 Binding。
Binder:消息中间件的组件,用来创建生产者和消费者的Binding。
RocketMQ实现了RocketMQMessageChannelBinder。
Kafka 实现了 KafkaMessageChannelBinder。
RabbitMQ 实现了RabbitMessageChannelBinder
Binding:包括Input Binding和Output Binding。是应用程序与消息中间件的桥梁,屏蔽与消息中间件的接触。
Binder&Binding
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {Binding<T> bindConsumer(String name, String group, T inboundBindTarget,
C consumerProperties);Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);}
Spring message
Spring Messaging spring framework中统一的消息编程模型。
Message:消息对象
MessageChannel:消息通道接口,用于接收消息,提供send方法将消息发送至消息通道。
MessageHandler:消息处理器接口,用于处理消息逻辑。
Spring Integration
spring framework中用于支持企业集成的一种机制。
MessageDispather:消息分发接口,用于分发消息和添加删除消息处理器。
MessageRoute:消息路由接口,定义默认的输出消息通道。
Filter:消息的过滤注解,用于配置消息过滤表达式。
Aggregator:消息的聚合注解,用于将多条消息聚合成一条。
Splitter:消息的分割,用于将一条消息拆分成多条。
从0开发RocketMQ
Spring cloud Alibaba 版本说明及依赖
https://github.com/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E
<properties><spring.boot.version>2.3.12.RELEASE</spring.boot.version><spring.cloud.version>Hoxton.SR12</spring.cloud.version><spring.cloud.alibaba.version>2.2.8.RELEASE</spring.cloud.alibaba.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring.cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring.cloud.alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency></dependencies>
Producer&Consumer配置
生产者&消费者的配置
Producer的配置:
spring:application:name: mq-producercloud:stream:bindings:output:destination: TopicTestcontent-type: application/jsonorder-output:destination: TopicOrdercontent-type: application/jsonrocketmq:binder:name-server: 127.0.0.1:9876bindings:output:producer:group: producer-grouporder-output:producer:group: order-producer-groupsync: true
server:port: 8081Consumer的配置:
spring:application:name: mq-consumercloud:stream:bindings:input:destination: TopicTestcontent-type: application/jsonorder-input:destination: TopicOrdercontent-type: application/jsonrocketmq:binder:name-server: 127.0.0.1:9876bindings:input:comsumer:group: comsumer-grouporder-input:comsumer:group: order-comsumer-grouporderly: true
server:port: 8090
顺序消息
顺序发送&顺序消费
顺序发送:
1、配置中添加sync:true
2、MessageBuilder setHeader 将消息固定发送到第0个消息队列
生产者demo:
package com.cloud.mq.producer.message;import java.util.Arrays;
import java.util.List;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import com.cloud.mq.source.OrderSource;/*** 顺序发送* sync true 同步,false异步(默认)* MessageBuilder setHeader 将消息固定发送到第0个消息队列* @author Administrator**/@RestController
@RequestMapping(value = "/orderly")
public class OrderlyController {@Autowiredprivate OrderSource orderSource; @GetMapping("/send")public String send() {List<String> msgList = Arrays.asList("创建订单","支付完成","申请退款");msgList.stream().forEach(msg->{MessageBuilder<String> builder = MessageBuilder.withPayload(msg)//表示顺序消息,将消息固定发送到第0个消息队列.setHeader(BinderHeaders.PARTITION_HEADER, 0);Message<String> message = builder.build();boolean result = orderSource.output().send(message);System.out.println(msg.concat(">>>"+result+", receiveTime = "+System.currentTimeMillis()));});return "success";}/*** 创建订单>>>true, receiveTime = 1665927552105* 支付完成>>>true, receiveTime = 1665927552110* 申请退款>>>true, receiveTime = 1665927552118*//*** 顺序发送的原理:同一个消息发送到相同的队列。为保证先发送的消息先存储到消息队列,必须使用同步发送的方式。* */
}
自定义source:OrderSource
package com.cloud.mq.source;import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;public interface OrderSource {String OUTPUT = "order-output";//String MEM_OUTPU = "meber-output";@Output(OrderSource.OUTPUT)MessageChannel output();/** @Output(OrderSource.MEM_OUTPU) MessageChannel memberOutput();可定义多个通道*/}
消费者demo:
顺序消费配置参数:orderly true ,false并发消费(默认)
顺序消费的原理:同一个消息队列只允许Consumer中的一个消费线程拉取消费。占锁,获得锁的允许消费。
源码:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService
ConsumeMessageOrderlyService class ConsumeRequest sync&获取锁注意:consumber中维护了一个线程池,可并发去同一个消息队列中拉去消息进行消费。消费demo:
package com.cloud.mq.consumer.listener;import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;import com.cloud.mq.sink.OrderSink;
/*** 顺序消费* orderly true 顺序消费,false并发消费(默认)* @author Administrator*/@Component
public class OrderlyListener {@StreamListener(value = OrderSink.INPUT)public void receiveOrder(String msg) {System.out.println("order input channel,receive:"+msg+", receiveTime = "+System.currentTimeMillis());}/*** order input channel,receive:创建订单, receiveTime = 1665927552396order input channel,receive:支付完成, receiveTime = 1665927552397order input channel,receive:申请退款, receiveTime = 1665927552398*/}
自定义Skin:
package com.cloud.mq.sink;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;public interface OrderSink {String INPUT = "order-input";@Input(OrderSink.INPUT)SubscribableChannel input();}
事务消息
//事务消息,生产者demo, 消费者代码可重用;@GetMapping("/send")public String sendOrder() {try {TransactionMQProducer producer = new TransactionMQProducer();producer.setNamesrvAddr(RocketMQConst.DEFAULT_NAME_SERVER);producer.setProducerGroup("order-producer-group");//自定义线程池,执行事务操作ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20), (Runnable r) -> new Thread("Order Transaction Massage Thread"));producer.setExecutorService(executor);//设置事务消息监听器producer.setTransactionListener(new TransactionMsgListener());producer.start();System.err.println("OrderService Start");for (int i = 0;i < 10;i++){String transactionId = UUID.randomUUID().toString();String payload = "下单,orderId: " + transactionId;String tags = "Tag";Message message = new Message("TopicOrder", tags, transactionId, payload.getBytes(RemotingHelper.DEFAULT_CHARSET));//发送事务消息TransactionSendResult result = producer.sendMessageInTransaction(message, transactionId);System.err.println("发送事务消息,发送结果: " + result);}} catch (Exception e) {// TODO: handle exceptionSystem.err.println(e.getMessage());}return "ok";
Spring cloud alibaba RocketMQ实例
参见官网demo
收发顺序消息 (aliyun.com)
收发事务消息 (aliyun.com)
收发定时和延时消息 (aliyun.com)
消息队列RocketMQ版客户端如何设置消费线程数? (aliyun.com)
Q&A
1、关于Source&Sink自定义发送&订阅消息通道:
Q:实际开发场景中会存在多个发送或订阅消息通道;
A:参考Source&Sink类可自定义发送或订阅消息通道接口,修改相关配置即可。
2、消息的幂等性
rocketmq不保证消息不被重复消息,如需要保证,需要在业务层面进行幂等性处理,eg 通过分布式锁来完成。
RocketMQ基础提高进阶demo实例应用相关推荐
- 从基础到进阶,一文详解RocketMQ事务消息,看完不会跪键盘
本文转载自:从基础到进阶,一文详解RocketMQ事务消息,看完不会跪键盘 事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务.本文对Roc ...
- Java核心技术卷一基础知识第10版demo实例
Java核心技术卷一基础知识第10版demo实例 第三章 JAVA的基本程序设计结构 3.7输入与输出 3.7.1读取输入 3.8控制流程 3.8.3循环 3.10数组 3.10.6多维数组 第四章 ...
- RocketMQ基础概念剖析源码解析
Topic Topic是一类消息的集合,是一种逻辑上的分区.为什么说是逻辑分区呢?因为最终数据是存储到Broker上的,而且为了满足高可用,采用了分布式的存储. 这和Kafka中的实现如出一辙,Kaf ...
- Docker基础、进阶笔记,为k8s的学习预预热
标题 Docker基础.进阶笔记,为k8s的学习预预热 笔记来源于视频: 狂神docker基础篇 狂神docker进阶篇 笔记中图片有些取自于:这位博主的两篇docker笔记中的一些图片 百度云笔记工 ...
- 小程序开发的基础与进阶
小程序的基础与进阶 模板与配置 数据绑定 1.数据绑定的基本原则 1.在data中定义数据 2.在WXML中使用数据 2.在data中定义页面的数据 在页面对应的.js文件中,把数据定义到data对象 ...
- chatgpt赋能python:Python中的4J-Selenium库:从基础到进阶
Python中的4J-Selenium库:从基础到进阶 随着互联网时代的到来,搜索引擎已经成为了每个人日常生活中不可或缺的一部分.SEO(Search Engine Optimization)也就应运 ...
- 计算机编程书籍-笨办法学Python 3:基础篇+进阶篇
编辑推荐: 适读人群 :本书适合所有已经开始使用Python的技术人员,包括初级开发人员和已经升级到Python 3.6版本以上的经验丰富的Python程序员. "笨办法学"系列, ...
- Android 系统(207)---Android各种Drawable讲解和demo实例
Android各种Drawable讲解和demo实例 转载自 : https://blog.csdn.net/linghu_java/article/details/42119969 PS:文字内容 ...
- 由浅入深学java iso_由浅入深学Java:基础、进阶与必做260题 PDF扫描版[47MB]
由浅入深学Java:基础.进阶与必做260题讲解了Java方方面面的知识,全书共分为4篇共28章,第1篇从最基本的JDK安装讲起,包括基本语法与数据类型.数组.字符串.程序控制语句.面向对象编程.继承 ...
最新文章
- JAVA各种并发锁从synchronized 到CAS 到 AQS
- OS X终端颜色[关闭]
- linux 中*与?结合起来的威力,匹配一个或者多个
- vi 搜索命令_vi或vim如何查询关键字
- Symfony2模版引擎使用说明手册
- Failed to load http://localhost:3000/products: The 'Access-Control-Allow-Origin' header has a value
- 微信小程序点播插件_微信小程序 wxParse插件显示视频问题
- WordPress主题:自媒体二号大前端模板
- java内存模型—先行发生原则
- Kafka数据迁移MaxCompute最佳实践
- ISO文件编辑工具-UltraISO软碟通提供下载
- 控制工程实践(11)——控制系统辨识
- 新一代视频编码标准:HEVC、AVS2和AV1性能对比报告
- 天津城市职业学院计算机国考没过,天津市2019年下半年全国计算机等级考试都有哪些考点可以报名?...
- java类为什么要序列化_Java 中的类为什么要实现序列化呢
- python运行报错怎么看_解决python运行启动报错问题
- python的浅拷贝和深copy
- PDF Expert for mac(专业pdf编辑器)兼容12系统兼容m1
- python实现检查邮箱格式是否正确,并检查提交的文件格式是否正确
- 使用Layui制作的简单数据表格