Rocketmq–消息驱动

mq的简介

什么是MQ

MQ(Message Queue)是一种跨进程的通信机制,用于传递消息。通俗点说,就是一个先进先出的数
据结构。

MQ的应用场景

异步解耦

最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如
下:

此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续
的注册短信和邮件不是即时需要关注的步骤。
所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返
回用户结果,由消息队列 MQ 异步地进行这些操作。架构图如下:

异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。主要的使用场景就是将
较耗时而且不需要即时(同步)返回结果的操作
作为消息放入消息队列。同时,由于使用了消息队列
MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即
解耦合。

流量削峰

流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流
量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解
决这些问题,可在应用和下游通知系统之间加入消息队列 MQ。

秒杀处理流程如下所述:

  1. 用户发起海量秒杀请求到秒杀业务处理系统。
  2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ。
  3. 下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
  4. 用户收到秒杀成功的通知。

常见的MQ产品

目前业界有很多MQ产品,比较出名的有下面这些:
ZeroMQ
号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用C语言
实现,实际上只是一个socket库的重新封装,如果做为消息队列使用,需要开发大量的代码。
ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。

RabbitMQ
使用erlang语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。

ActiveMQ
历史悠久的Apache开源项目。已经在很多产品中得到应用,实现了JMS1.1规范,可以和springjms轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。

RocketMQ
阿里巴巴的MQ中间件,由java语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来
很简单。

Kafka
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,
相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布
式系统。

RocketMQ入门

RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用
非常广泛,已经经过了"双11"这种万亿级的消息流转。

1、RocketMQ环境搭建

接下来我们先在linux平台下安装一个RocketMQ的服务

2、环境准备

下载RocketMQ

http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

环境要求
Linux 64位操作系统
64bit JDK 1.8+

安装RocketMQ

上传文件到Linux系统

[root@heima rocketmq]# ls /usr/local/src/ rocketmq-all-4.4.0-bin-release.zip

解压到安装目录

[root@heima src]# unzip rocketmq-all-4.4.0-bin-release.zip
[root@heima src]# mv rocketmq-all-4.4.0-bin-release ../rocketmq

启动RocketMQ

切换到安装目录

[root@heima rocketmq]# ls
benchmark bin conf lib LICENSE NOTICE README.md

启动NameServer

[root@heima rocketmq]# nohup ./bin/mqnamesrv &
[1] 1467
# 只要进程不报错,就应该是启动成功了,可以查看一下日志
[root@heima rocketmq]# tail -f /root/logs/rocketmqlogs/namesrv.log

启动Broker

# 编辑bin/runbroker.sh 和 bin/runserver.sh文件,修改里面的
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# 为JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
[root@heima rocketmq]# nohup bin/mqbroker -n localhost:9876 &
[root@heima rocketmq]# tail -f /root/logs/rocketmqlogs/broker.log

测试RocketMQ
测试消息发送

[root@heima rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@heima rocketmq]# bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

测试消息接收

[root@heima rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@heima rocketmq]# bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

RocketMQ的架构及概念


如上图所示,整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer。
Broker(邮递员)
Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能
NameServer(邮局)
消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息
Producer(寄件人)
消息的生产者,需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消

Consumer(收件人)
消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消

Topic(地区)
用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息
Message Queue(邮件)
为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message
Queue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个
Message Queue读取消息
Message
Message 是消息的载体。
Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
Consumer Group
消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。

RocketMQ控制台安装

下载

在git上下载下面的工程 rocketmq-console-1.0.0
https://github.com/apache/rocketmq-externals/releases

修改配置文件

# 修改配置文件
rocketmq-console\src\main\resources\application.properties server.port=7777
#项目启动后的端口号 rocketmq.config.namesrvAddr=192.168.109.131:9876
#nameserv的地址,注意防火墙要开启 9876端口

打成jar包,并启动

# 进入控制台项目,将工程打成jar包 mvn clean package -Dmaven.test.skip=true
# 启动控制台 java -jar target/rocketmq-console-ng-1.0.0.jar

访问控制台

maven 整合mq

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>

发送消息
消息发送步骤:

  1. 创建消息生产者, 指定生产者所属的组名
  2. 指定Nameserver地址
  3. 启动生产者
  4. 创建消息对象,指定主题、标签和消息体
  5. 发送消息
  6. 关闭生产者
package com.qf;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;public class RocketmqSend {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer defaultMQProducer = new DefaultMQProducer("myproducer-group");defaultMQProducer.setNamesrvAddr("192.168.229.135:9876");defaultMQProducer.start();Message msg = new Message("myTopic", "myTag", ("RocketMQ Message").getBytes());SendResult sendResult = defaultMQProducer.send(msg,10000);System.out.println(sendResult);//6. 关闭生产者defaultMQProducer.shutdown();}
}

接收消息
消息接收步骤:

  1. 创建消息消费者, 指定消费者所属的组名
  2. 指定Nameserver地址
  3. 指定消费者订阅的主题和标签
  4. 设置回调函数,编写处理消息的方法
  5. 启动消息消费者
package com.qf;import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class RocketmqReciver {public static void main(String[] args) throws MQClientException {//消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group");//从哪个消息服务器拿数据consumer.setNamesrvAddr("192.168.229.135:9876");consumer.subscribe("myTopic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println(messageExt);byte[] body = messageExt.getBody();String s = new String(body);System.out.println(s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

springboot 整合mq

案例

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

商品微服务发送消息
在 商品微服务中添加rocketmq的依赖

  <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency>

添加配置

server:port: 8083
spring:application:name: good-servercloud:nacos:discovery:server-addr: 192.168.229.135:8848username: nacospassword: nacos
rocketmq:name-server: 192.168.229.135:9876

编写代码

package com.qf.controller;import com.qf.entity.Goods;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;@RestController
@RequestMapping("/message")
public class MessageController {@Autowiredprivate RocketMQTemplate template;@RequestMapping("/send")public Goods sendGoodMesaage(){Goods goods = new Goods("小米手机", 5600, new Date());//发送消息template.convertAndSend("myTopic",goods);return goods;}
}
package com.qf.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Data
@NoArgsConstructor
@AllArgsConstructor
public class Goods {private String goodsName;private Integer price;private Date saleDate;
}

订单微服务订阅消息
在 订单微服务中添加rocketmq的依赖

 <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency>

添加配置

server:port: 8084
spring:application:name: order-servercloud:nacos:discovery:server-addr: 192.168.229.135:8848username: nacospassword: nacos
rocketmq:name-server: 192.168.229.135:9876

编写代码

package com.qf.controller.service;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = "myproducer-group",topic = "myTopic")
public class SmsService implements RocketMQListener<Goods> {@Overridepublic void onMessage(Goods goods) {log.info("接收到的订单新秀{}", JSON.toJSON(goods));}
}

启动服务,执行下单操作,观看后台输出

发送不同类型的消息

普通消息

RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送

可靠同步发送

同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。
此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

可靠异步发送

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

单向发送

单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

   <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency>
import com.qf.GoodsApplication;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest(classes = GoodsApplication.class)
public class SendMessage {@Autowiredprivate RocketMQTemplate template;@Testpublic void sysncTest(){for (int i = 0; i < 10; i++) {SendResult myTopic = template.syncSendOrderly("myTopic", "这是一条同的消息,需要被处理", "xx");System.out.println(myTopic);}}@Testpublic void asyscnTest() throws InterruptedException {template.asyncSend("myTopic", "这是一条异步的消息,需要被处理", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println(sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.println(throwable);}});Thread.sleep(1000);}@Testpublic void oneWay(){template.sendOneWay("myTopic", "这是一条异步的消息,需要被处理");}@Testpublic void oneWayOrder(){for (int i = 0; i < 10; i++) {template.sendOneWayOrderly("myTopic", "这是一条排好序的消息,需要被处理","xx");}}//批量发送接收消息@Testpublic void batchSendMessage(){List<Message> list=new ArrayList<>();for (int i = 0; i < 10; i++) {Message msg = new Message("myTopic", "tag1", ("RocketMQ Message cout:"+i).getBytes());list.add(msg);}template.syncSend("myTopic",list,1000);}//延时消息@Testpublic void delayMessage(){Message msg = new Message("myTopic", "tag1", ("Rocketmq Delay Message").getBytes());SendResult myTopic = template.syncSend("myTopic", MessageBuilder.withPayload(msg).build(), 1000, 3);System.out.println(myTopic);}
}

三种发送方式的对比

发送方式 发送方式 发送结果反馈 可靠性
同步发送 不丢失
异步发送 不丢失
单向发送 最快 可能丢失

顺序消息

顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型,如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的(消息会落在topic下的一个队列,按顺序排列)。

//同步顺序消息[异步顺序 单向顺序写法类似]
public void testSyncSendOrderly()
{
//第三个参数用于队列的选择
rocketMQTemplate.syncSendOrderly("test-topic-1", "这是一条异步顺序消息", "xxxx");
}

Rocketmq简介及部署、原理和使用介绍相关推荐

  1. canal简介及canal部署、TCP原理和使用介绍

    canal简介及canal部署.原理和使用介绍 canal入门 什么是canal canal使用场景 canal运行原理 MySQL的binlog介绍 什么是binlog 开启MySQL的binlog ...

  2. Maxwell简介、部署、原理和使用介绍

    Maxwell简介.部署.原理和使用介绍 1.Maxwell概述简介 1-1.Maxwell简介 ​ Maxwell是由美国Zendesk公司开源,使用Java编写的MySQL变更数据抓取软件.他会实 ...

  3. ElasticSearch简介及ElasticSearch部署、原理和使用介绍

    ElasticSearch简介及ElasticSearch部署.原理和使用介绍 第一章:elasticsearch简介 ElasticSearch是一个基于Lucene的搜索服务器.它提供了一个分布式 ...

  4. Flume简介及Flume部署、原理和使用介绍

    Flume简介及Flume部署.原理和使用介绍 Flume概述 ​ Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统.Flume基于流式架构,灵活简单. ...

  5. canal简介及canal部署、原理和使用介绍

    阿里canal简介及canal部署.原理和使用介绍 canal入门 什么是canal 阿里巴巴B2B公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了杭州和美国异地机房的需求, ...

  6. azkaban简介及azkaban部署、原理和使用介绍

    azkaban简介及azkaban部署.原理和使用介绍 azkaban简介 ​ Azkaban是一套简单的任务调度服务,整体包括三部分webserver.dbserver.executorserver ...

  7. Pulsar简介及Pulsar部署、原理和使用介绍

    Pulsar简介及Pulsar部署.原理和使用介绍 Pulsar简介 诞生背景 Apache Pulsar 是一个企业级的分布式消息系统,最初由 Yahoo 开发,在 2016 年开源,并于2018年 ...

  8. CDH简介及CDH部署、原理和使用介绍( 版本6.3.1 )

    CDH简介及CDH部署.原理和使用介绍( 版本6.3.1 ) 第一章:CDH简介 CDH概念 ​ CDH是Cloudera的100%开源平台发行版,包括Apache Hadoop,专为满足企业需求而构 ...

  9. TIDB简介及TIDB部署、原理和使用介绍

    TiDB简介及TiDB部署.原理和使用介绍 从MySQL架构到TiDB 数据库分类 ​ 介绍TiDB数据库之前,先引入使用场景.如今的数据库种类繁多,RDBMS(关系型数据库).NoSQL(Not O ...

最新文章

  1. 这个时代,给了我们年轻人太多
  2. Multiple annotations found at this line: ---关于android string.xml %问题
  3. spring-test测试demo
  4. 多路复用器_多路复用、非阻塞、线程与协程
  5. 关于EasyCVR平台Ehome协议接入设备出现停止启用现象的原因分析
  6. Idea编写简单Java网络爬虫程序(maven)
  7. 单片机控制两个步进电机画圆_单片机控制的步进电机程序框图
  8. 这 4 个远程桌面开源了!
  9. mac sz rz file tras
  10. 板材品牌之生态板吊顶好还是桑拿板好
  11. 算法上的创新点大搜罗
  12. C#将自定义的时间字符串直接转换为UTC世界协调时间
  13. 接入微信自定义版交易组件3.0,小程序对接视频号操作说明
  14. android exo解码问题,Android Exoplayer音频播放异常
  15. 阿里巴巴余军:钉钉宜搭低代码实践之路
  16. Gibberish 本地化插件学习
  17. uclinux开发概述
  18. 小米笔记本电脑怎么使用U盘重装系统教学
  19. Tensorflow 自然语言处理
  20. springboot项目添加了logback-spring.xml配置文件不生效

热门文章

  1. HDU 2022 海选女主角
  2. c语言程序0到1000的素数,C语言实现之100-1000以内素数的等差数列
  3. 最近大火的 NFT 数字藏品是什么?
  4. 2021大厂Android面试经验,经典好文
  5. 二叉树的基本概念和性质
  6. 网络维护工程师的要求是什么?
  7. 如何在Linux中发现IP地址冲突
  8. SpringBoot word文件转pdf
  9. MATLAB 矢量场
  10. 搭建视频会议系统OpenMeetings