目录

MQ可以做什么?

RocketMQ基础知识

RocketMQ核心组成

NameServer

Broker

Producer&Consumer

RocketMQ设计

RocketMQ优势

Spring Cloud Alibaba 中的RocketMQ

Spring cloud stream

Binder&Binding

Spring message

Spring Integration

从0开发RocketMQ

Producer&Consumer配置

顺序消息

事务消息

Spring cloud alibaba RocketMQ实例

Q&A


MQ可以做什么?

业务异步解耦

解耦微服务

流量削峰填谷

消息分发

分布式事务的数据一致性

主流的 MQ : RocketMQ、kafka、RabbitMQ

RocketMQ基础知识

java、分布式消息中间件。

支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型。

支持拉pull和推push两种消息模式。

PS:Push其底层是一个pull模式,它由一个业务代码从MQ中pull消息,再由业务代码push给特定的应用/消费者。

支持多种消息协议,如:JMS\MQTT等。

提供docker镜像用于隔离测试和云集群部署。

RocketMQ核心组成

NameServerBrokerProducer以及Consumer四部分。

NameServer

RocketMQ 的服务注册中心,是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

作用:

用来保存 Broker 相关元信息并给 Producer 和 Consumer 查找Broker 信息。

每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到NameServer 获取到 Broker 的路由信息,进而和Broker取得连接。Consumer 也会定时获取 Topic 的路由信息。

主要包含两个功能:

1、Broker 管理,接收来自 Broker 集群的注册请求,提供心跳机制检测 Broker 是否存活;

2、路由管理,每个 NameServer 持有全部有关 Broker 集群和客户端请求队列的路由信息;

Broker

消息服务器,负责存储消息、转发消息。

PS:元数据,包括消费者组、消费进度偏移量、队列消息等。

Broker 有 Master 和 Slave 两种类型,Master 支持读&写,Slave仅支持读。

Master与Slave的对应关系通过指定相同的Broker Name,不同的BrokerId来定义,BrokerId0Master,非0表示Slave

Broker定时(每隔30s)注册Topic信息到所有Name Server。

      PS:部署方式灵活,单master,多master,多masterslave(异步复制,同步双写)

消息存储文件、消息消费队列文件与索引文件。

Producer&Consumer

Producer:消息发布者,负责生产并发送消息至 Topic(brokers)。

ProducerGroup:生产者组,业务上同组producer发送同类消息且发送逻辑一致,部署上则标记一个集群。

Consumer:消息订阅者,负责从 Topic(broker) 接收并消费消息。

ConsumerGroup:消费者组,消费同类消息并且消费逻辑一致。

PS:同一条消息,只能被某一消费组其中的一台机器消费,但是可以同时被不同的消费组消费。

发送的多种范例:

同步发送:发送消息后,接收到响应再发下一个消息。

异步发送:发送消息后,不等待响应,直接发下一个消息。

单向发送:只负责发送消息而不等待服务器回应且没有回调函数触发。适用例如日志收集等。

消费模式:集群消费(Clustering)和广播消费(Broadcasting)

消费顺序:顺序消费(Orderly)和并行消费(Concurrently)

RocketMQ设计

RocketMQ 的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一。

NameServer设计简单,集群间互不通信。Topic路由信息在集群间追求最终一致性。

高效的IO存储机制消息发送的高吞吐量。RocketMQ消息存储文件设计成文件组的概念,组内单个文件大小固定,方便引入内存映射机制。所有主题的消息存储基于顺序写,提升写性能,同时为了兼顾消息消费与消息查找,引入了消息消费队列文件与索引文件

设计缺陷:只保证消息被消费者消费,允许消息被重复消费,如果需要避免重复消费则需要在消费端自己实现。

RocketMQ优势

支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持);

支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提);

支持 18 个级别的延迟消息(Kafka 不支持)支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认);

支持 Consumer 端 Tag 过滤,减少不必要的网络传输(即过滤由MQ完成,而不是由消费者完成。RabbitMQ 和 Kafka 不支持);

支持重复消费(RabbitMQ 不支持,Kafka 支持);

Spring Cloud Alibaba 中的RocketMQ

Spring cloud stream

构建基于消息(spring message)的微服务应用框架,使用Spring Integration与 Broker 进行连接。

核心组件:Binder、 Binding。

Binder:消息中间件的组件,用来创建生产者和消费者的Binding。

RocketMQ实现了RocketMQMessageChannelBinder。

Kafka 实现了 KafkaMessageChannelBinder。

RabbitMQ 实现了RabbitMessageChannelBinder

Binding:包括Input Binding和Output Binding。是应用程序与消息中间件的桥梁,屏蔽与消息中间件的接触。

Binder&Binding

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {Binding<T> bindConsumer(String name, String group, T inboundBindTarget,
C consumerProperties);Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);}

Spring message

Spring Messaging  spring framework中统一的消息编程模型。

Message:消息对象
MessageChannel:消息通道接口,用于接收消息,提供send方法将消息发送至消息通道。
MessageHandler:消息处理器接口,用于处理消息逻辑。

Spring Integration

spring framework中用于支持企业集成的一种机制。

MessageDispather:消息分发接口,用于分发消息和添加删除消息处理器。
MessageRoute:消息路由接口,定义默认的输出消息通道。
Filter:消息的过滤注解,用于配置消息过滤表达式。
Aggregator:消息的聚合注解,用于将多条消息聚合成一条。
Splitter:消息的分割,用于将一条消息拆分成多条。

从0开发RocketMQ

Spring cloud Alibaba 版本说明及依赖

https://github.com/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E

<properties><spring.boot.version>2.3.12.RELEASE</spring.boot.version><spring.cloud.version>Hoxton.SR12</spring.cloud.version><spring.cloud.alibaba.version>2.2.8.RELEASE</spring.cloud.alibaba.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring.cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring.cloud.alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency></dependencies>

Producer&Consumer配置

生产者&消费者的配置

Producer的配置:
spring:application:name: mq-producercloud:stream:bindings:output:destination: TopicTestcontent-type: application/jsonorder-output:destination: TopicOrdercontent-type: application/jsonrocketmq:binder:name-server: 127.0.0.1:9876bindings:output:producer:group: producer-grouporder-output:producer:group: order-producer-groupsync: true
server:port: 8081Consumer的配置:
spring:application:name: mq-consumercloud:stream:bindings:input:destination: TopicTestcontent-type: application/jsonorder-input:destination: TopicOrdercontent-type: application/jsonrocketmq:binder:name-server: 127.0.0.1:9876bindings:input:comsumer:group: comsumer-grouporder-input:comsumer:group: order-comsumer-grouporderly: true
server:port: 8090

顺序消息

顺序发送&顺序消费

顺序发送:

1、配置中添加sync:true

2、MessageBuilder setHeader 将消息固定发送到第0个消息队列

生产者demo:

package com.cloud.mq.producer.message;import java.util.Arrays;
import java.util.List;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import com.cloud.mq.source.OrderSource;/*** 顺序发送*        sync true 同步,false异步(默认)*     MessageBuilder setHeader 将消息固定发送到第0个消息队列* @author Administrator**/@RestController
@RequestMapping(value = "/orderly")
public class OrderlyController {@Autowiredprivate OrderSource orderSource;  @GetMapping("/send")public String send() {List<String> msgList = Arrays.asList("创建订单","支付完成","申请退款");msgList.stream().forEach(msg->{MessageBuilder<String> builder = MessageBuilder.withPayload(msg)//表示顺序消息,将消息固定发送到第0个消息队列.setHeader(BinderHeaders.PARTITION_HEADER, 0);Message<String> message = builder.build();boolean result = orderSource.output().send(message);System.out.println(msg.concat(">>>"+result+", receiveTime = "+System.currentTimeMillis()));});return "success";}/*** 创建订单>>>true, receiveTime = 1665927552105* 支付完成>>>true, receiveTime = 1665927552110* 申请退款>>>true, receiveTime = 1665927552118*//*** 顺序发送的原理:同一个消息发送到相同的队列。为保证先发送的消息先存储到消息队列,必须使用同步发送的方式。*      */
}

自定义source:OrderSource

package com.cloud.mq.source;import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;public interface OrderSource {String    OUTPUT = "order-output";//String  MEM_OUTPU = "meber-output";@Output(OrderSource.OUTPUT)MessageChannel output();/** @Output(OrderSource.MEM_OUTPU) MessageChannel memberOutput();可定义多个通道*/}

消费者demo:

顺序消费配置参数:orderly true ,false并发消费(默认)

顺序消费的原理:同一个消息队列只允许Consumer中的一个消费线程拉取消费。占锁,获得锁的允许消费。

源码:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService
ConsumeMessageOrderlyService     class ConsumeRequest  sync&获取锁注意:consumber中维护了一个线程池,可并发去同一个消息队列中拉去消息进行消费。消费demo:
package com.cloud.mq.consumer.listener;import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;import com.cloud.mq.sink.OrderSink;
/*** 顺序消费*      orderly true 顺序消费,false并发消费(默认)* @author Administrator*/@Component
public class OrderlyListener {@StreamListener(value = OrderSink.INPUT)public void receiveOrder(String msg) {System.out.println("order input channel,receive:"+msg+", receiveTime = "+System.currentTimeMillis());}/*** order input channel,receive:创建订单, receiveTime = 1665927552396order input channel,receive:支付完成, receiveTime = 1665927552397order input channel,receive:申请退款, receiveTime = 1665927552398*/}

自定义Skin:

package com.cloud.mq.sink;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;public interface OrderSink {String     INPUT = "order-input";@Input(OrderSink.INPUT)SubscribableChannel input();}

事务消息

//事务消息,生产者demo, 消费者代码可重用;@GetMapping("/send")public String sendOrder() {try {TransactionMQProducer producer = new TransactionMQProducer();producer.setNamesrvAddr(RocketMQConst.DEFAULT_NAME_SERVER);producer.setProducerGroup("order-producer-group");//自定义线程池,执行事务操作ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20), (Runnable r) -> new Thread("Order Transaction Massage Thread"));producer.setExecutorService(executor);//设置事务消息监听器producer.setTransactionListener(new TransactionMsgListener());producer.start();System.err.println("OrderService Start");for (int i = 0;i < 10;i++){String transactionId = UUID.randomUUID().toString();String payload = "下单,orderId: " + transactionId;String tags = "Tag";Message message = new Message("TopicOrder", tags, transactionId, payload.getBytes(RemotingHelper.DEFAULT_CHARSET));//发送事务消息TransactionSendResult result = producer.sendMessageInTransaction(message, transactionId);System.err.println("发送事务消息,发送结果: " + result);}} catch (Exception e) {// TODO: handle exceptionSystem.err.println(e.getMessage());}return "ok";

Spring cloud alibaba RocketMQ实例

参见官网demo

收发顺序消息 (aliyun.com)

收发事务消息 (aliyun.com)

收发定时和延时消息 (aliyun.com)

消息队列RocketMQ版客户端如何设置消费线程数? (aliyun.com)

Q&A

1、关于Source&Sink自定义发送&订阅消息通道:

Q:实际开发场景中会存在多个发送或订阅消息通道;

A:参考Source&Sink类可自定义发送或订阅消息通道接口,修改相关配置即可。

2、消息的幂等性

rocketmq不保证消息不被重复消息,如需要保证,需要在业务层面进行幂等性处理,eg 通过分布式锁来完成。

RocketMQ基础提高进阶demo实例应用相关推荐

  1. 从基础到进阶,一文详解RocketMQ事务消息,看完不会跪键盘

    本文转载自:从基础到进阶,一文详解RocketMQ事务消息,看完不会跪键盘 事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务.本文对Roc ...

  2. Java核心技术卷一基础知识第10版demo实例

    Java核心技术卷一基础知识第10版demo实例 第三章 JAVA的基本程序设计结构 3.7输入与输出 3.7.1读取输入 3.8控制流程 3.8.3循环 3.10数组 3.10.6多维数组 第四章 ...

  3. RocketMQ基础概念剖析源码解析

    Topic Topic是一类消息的集合,是一种逻辑上的分区.为什么说是逻辑分区呢?因为最终数据是存储到Broker上的,而且为了满足高可用,采用了分布式的存储. 这和Kafka中的实现如出一辙,Kaf ...

  4. Docker基础、进阶笔记,为k8s的学习预预热

    标题 Docker基础.进阶笔记,为k8s的学习预预热 笔记来源于视频: 狂神docker基础篇 狂神docker进阶篇 笔记中图片有些取自于:这位博主的两篇docker笔记中的一些图片 百度云笔记工 ...

  5. 小程序开发的基础与进阶

    小程序的基础与进阶 模板与配置 数据绑定 1.数据绑定的基本原则 1.在data中定义数据 2.在WXML中使用数据 2.在data中定义页面的数据 在页面对应的.js文件中,把数据定义到data对象 ...

  6. chatgpt赋能python:Python中的4J-Selenium库:从基础到进阶

    Python中的4J-Selenium库:从基础到进阶 随着互联网时代的到来,搜索引擎已经成为了每个人日常生活中不可或缺的一部分.SEO(Search Engine Optimization)也就应运 ...

  7. 计算机编程书籍-笨办法学Python 3:基础篇+进阶篇

    编辑推荐: 适读人群 :本书适合所有已经开始使用Python的技术人员,包括初级开发人员和已经升级到Python 3.6版本以上的经验丰富的Python程序员. "笨办法学"系列, ...

  8. Android 系统(207)---Android各种Drawable讲解和demo实例

    Android各种Drawable讲解和demo实例 转载自  : https://blog.csdn.net/linghu_java/article/details/42119969 PS:文字内容 ...

  9. 由浅入深学java iso_由浅入深学Java:基础、进阶与必做260题 PDF扫描版[47MB]

    由浅入深学Java:基础.进阶与必做260题讲解了Java方方面面的知识,全书共分为4篇共28章,第1篇从最基本的JDK安装讲起,包括基本语法与数据类型.数组.字符串.程序控制语句.面向对象编程.继承 ...

最新文章

  1. JAVA各种并发锁从synchronized 到CAS 到 AQS
  2. OS X终端颜色[关闭]
  3. linux 中*与?结合起来的威力,匹配一个或者多个
  4. vi 搜索命令_vi或vim如何查询关键字
  5. Symfony2模版引擎使用说明手册
  6. Failed to load http://localhost:3000/products: The 'Access-Control-Allow-Origin' header has a value
  7. 微信小程序点播插件_微信小程序 wxParse插件显示视频问题
  8. WordPress主题:自媒体二号大前端模板
  9. java内存模型—先行发生原则
  10. Kafka数据迁移MaxCompute最佳实践
  11. ISO文件编辑工具-UltraISO软碟通提供下载
  12. 控制工程实践(11)——控制系统辨识
  13. 新一代视频编码标准:HEVC、AVS2和AV1性能对比报告
  14. 天津城市职业学院计算机国考没过,天津市2019年下半年全国计算机等级考试都有哪些考点可以报名?...
  15. java类为什么要序列化_Java 中的类为什么要实现序列化呢
  16. python运行报错怎么看_解决python运行启动报错问题
  17. python的浅拷贝和深copy
  18. PDF Expert for mac(专业pdf编辑器)兼容12系统兼容m1
  19. python实现检查邮箱格式是否正确,并检查提交的文件格式是否正确
  20. 使用Layui制作的简单数据表格

热门文章

  1. Vultr Debian系统如何重置ROOT管理员密码
  2. 存储器国产化为何选3D NAND作为突破口?
  3. Android 一个简单手机响铃功能实现
  4. NVIDIA Jetson AGX Xavier
  5. Windows平台录音类封装:AudioRecordWindows
  6. 华大HC32L130 SPI和GPIO模拟驱动NF-03和NF-01-s模块(SI24R1方案,兼容NRF24L01)
  7. 啃书:《利用Python进行数据分析》第一章
  8. 电路设计中晶体和晶振的区别
  9. HBuilderX网站打包APP
  10. 瑞吉外卖项目day01