RocketMQ介绍

RocketMQ 是一个 队列模型 的消息中间件,具有高性能、高可靠、高实时、分布式 的特点。它是一个采用 Java 语言开发的分布式的消息系统,由阿里巴巴团队开发,在2016年底贡献给 Apache,成为了 Apache 的一个顶级项目。 在阿里内部,RocketMQ 很好地服务了集团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级消息通过 RocketMQ 流转。

RocketMQ 特点

  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
  • Producer、Consumer、队列都可以分布式
  • Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合
  • 能够保证严格的消息顺序
  • 支持拉(pull)和推(push)两种消息模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
  • 支持多种消息协议,如 JMS、OpenMessaging 等
  • 较少的依赖
    #kafka 、RocketMQ 、RabbitMQ 对比

RocketMQ安装

RocketMQ下载: rocketmq-all-4.8.0-bin-release.zip
1.RocketMQ zip包传入linux服务器

[root@localhost ]# cd usr/local/
[root@localhost local]# rz

2.解压缩

[root@localhost local]# unzip rocketmq-all-4.8.0-bin-release.zip

3.调整启动参数(修改默认启动参数,默认启动的最大内存为4G,比较大,修改小一点,否则如果服务器内存不够会启动失败)

[root@localhost local]# cd rocketmq-all-4.8.0-bin-release/bin
[root@localhost bin]# vim runserver.sh

  • -Xms4g -Xmx4g -Xmn2g 改为 -Xms256m -Xmx256m -Xmn128m

4.调整broker

[root@localhost bin]# vim runbroker.sh

  • -Xms8g -Xmx8g -Xmn4g 改为 -Xms256m -Xmx256m -Xmn128m

5.启动namesrv

[root@localhost bin]# nohup sh mqnamesrv &

6.启动broker,注意ip为公网ip,端口为navmesrv的默认端口9876

[root@localhost bin]# nohup ./mqbroker -n localhost:9876 &

7.检查是否启动成功

[root@localhost bin]# jps -l
  • 如果发现报错bash: jps: 未找到命令… 请更新以下命令
[root@localhost bin]# sudo yum install java-1.8.0-openjdk-devel.x86_64

  • 输入命令 jps -l

  • 关闭 RocketMQ 命令 (此处无需关闭,只用于了解)

./mqshutdown broker
./mqshutdown namesrv

#RocketMQ 控制台安装
1.克隆rocketmq项目

[root@localhost local]# cd /usr/local/
[root@localhost local]# git clone  https://github.com/apache/rocketmq-externals.git
  • 进入\rocketmq-externals\rocketmq-console\src\main\resources\ 下修改 application.properties 配置文件

  • 配置文件修改如下图

github提供了 Docker 和 非Docker 两种安装方法供其选择,这里使用非Docker方式进行安装

  • 在 \rocketmq-externals\rocketmq-console\ 文件夹下打开控制台,输入以下命令进行maven打包

mvn clean package -Dmaven.test.skip=true

  • 进入 \rocketmq-externals\rocketmq-console\target\ 文件夹下打开控制台,输入以下命令进行 jar包启动

java -jar rocketmq-console-ng-2.0.0.jar

  • 打开浏览器访问 localhost:9877,如果报错

  • 开放 10909 01911 9876 端口

firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload

  • 验证RocketMQ功能是够正常

1.验证生产消息正常,输入命令

[root@localhost rocketmq-all-4.8.0-bin-release]# export NAMESRV_ADDR=localhost:9876
[root@localhost rocketmq-all-4.8.0-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

正常的情况下,会看到一堆的类似于如下的输出,这是生产消息后成功的result:

SendResult [sendStatus=SEND_OK, msgId=7F000001372329453F44466341350068, offsetMsgId=C0A8017600002A9F000000000000674E, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=33]

2.验证消费消息正常,执行如下命令:

[root@localhost rocketmq-all-4.8.0-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

正常的情况下,会看到一堆的类似于如下的输出,这是消费的消息内容:

ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=201, queueOffset=0, sysFlag=0, bornTimestamp=1618387294736, bornHost=/192.168.1.118:34722, storeTimestamp=1618387294743, storeHost=/192.168.1.118:10911, msgId=C0A8017600002A9F0000000000000192, commitLogOffset=402, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=34, CONSUME_START_TIME=1618387666005, UNIQ_KEY=7F00000136FE29453F44466306100001, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]]
  • 访问页面 http://localhost:9877 ,出现下图界面,安装成功

RocketMQ 集成 - 生产者

  • gateway下pom.xml文件添加依赖
<!-- rocketmq --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency>
  • nacos 配置 RocketMQ

rocketmq:name-server: 192.168.190.129:9876producer:# 小坑:必须指定groupgroup: test-group
  • common 下创建实体类 MyMessage.class
package com.bi.cloud.pojo;import lombok.Data;import java.io.Serializable;
import java.util.Date;@Data
public class MyMessage implements Serializable {private Integer id;private String name;private String status;private Date createTime;
}
  • gateway下创建 TestProducerController.class
package com.bi.cloud.controller;import com.bi.cloud.pojo.MyMessage;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.Date;/*** 生产者**/
@RestController
@RequestMapping("/api/testRocketMQ")
public class TestProducerController {/*** 用于发送消息到 RocketMQ 的api*/@Resourcepublic RocketMQTemplate rocketMQTemplate;@GetMapping("/sendMsg")public String testSendMsg() {String topic = "test-topic";MyMessage message = new MyMessage();message.setId(1);message.setName("王霄");message.setStatus("default");message.setCreateTime(new Date());// 发送消息rocketMQTemplate.convertAndSend(topic, message);return "send message success";}
}
  • Postman 调用接口

  • 如果报错 请关闭linux防火墙
systemctl stop firewalld
  • 消息发送成功后,可以到RocketMQ的控制台中进行查看:

RocketMQ 集成 - 消费者

  • engine下pom.xml文件添加依赖

<!-- rocketmq --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency>
  • nacos 配置 RocketMQ

rocketmq:name-server: 192.168.190.129:9876producer:# 小坑:必须指定groupgroup: test-group
  • engine 下创建消费者监听器 TestConsumerListener.class

package com.bi.cloud.service.Impl;import com.alibaba.fastjson.JSON;
import com.bi.cloud.pojo.MyMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 消费者监听器**/
@Slf4j
@Component
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group")
public class TestConsumerListener implements RocketMQListener<MyMessage> {/*** 监听到消息的时候就会调用该方法*/@Overridepublic void onMessage(MyMessage message) {log.info("从test-topic中监听到消息");log.info(JSON.toJSONString(message));}
}
  • 编写完成后启动项目,由于之前我们已经往队列里发送了消息,所以此时消费者项目一启动,就可以监听到消息并消费,控制台就会输出如下日志:

前往:第八章 Oauth2.0 安全认证子模块集成

参考文献:
https://github.com/apache/rocketmq-externals.git
https://blog.csdn.net/qq_40280582/article/details/111785355
https://zhuhuix.blog.csdn.net/article/details/108866638
https://blog.51cto.com/zero01/2426303

微服务 Spring Cloud Alibaba 项目搭建(七、RocketMQ 集成)相关推荐

  1. 微服务 Spring Cloud Alibaba 项目搭建(三、common通用模块创建)

    一.项目 - New - Module 二.选择Maven - jdk- Next 三.修改Name为common - 查看GroupId是否正确 - Finish 四.修改pom.xml文件,放入通 ...

  2. spring cloud 实战项目搭建

    spring cloud 实战项目搭建 Spring Cloud简介 Spring Cloud是一个基于Spring Boot实现的云应用开发工具,它为基于JVM的云应用开发中的配置管理.服务发现.断 ...

  3. 微服务Spring Cloud

    Java服务器项目分类 现在市面上常见的java开发的项目可以分为两大类 1.企业级应用 一般指一个企业或机构内部使用的网站或服务器应用程序 包括的领域很多,包括并不限于:商业,企事业单位,医疗,军事 ...

  4. 消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ

    系列文章导航: Spring Cloud Alibaba微服务解决方案 常用MQ产品的选择 目前主流的MQ产品有kafka.RabbitMQ.ActiveMQ.RocketMQ等.在MQ选型时可以参照 ...

  5. 异构服务器 微服务_Spring Cloud Alibaba Sidecar 多语言微服务异构

    Spring Cloud Alibaba Sidecar 介绍 自 Spring Cloud Alibaba 2.1.1 版本后增加了 spring-cloud-alibaba-sidecar 模块作 ...

  6. 基于微服务spring cloud+vue+spring boot在线聊天交友软件的设计与实现

    针对企业和机构在聊天通讯软件中的的信息管理监控困难.系统稳定性不足.二次开发困难.部署运维困难问题,对一个自主可控,安全稳定的通讯系统进行分析设计. 采用前后端分离的开发模式,前端基于WEB端,采用基 ...

  7. Spring Cloud Alibaba 项目工程准备

    创建工程 Spring Cloud 项目都是基于 Spring Boot 进行开发,并且都是使用 Maven 做项目管理工具.在实际开发中,我们一般都会创建一个依赖管理项目作为 Maven 的 Par ...

  8. 干货实操:微服务Spring Cloud 系列(二) Eureka服务发现与服务注册(strand alone)

    此篇主要实操Eureka 服务端的服务注册,以及服务发现,并需要认证才能访问控制中心. 分五个部分说明: 一.  认识 Eureka 二.  Eureka  服务端开发 三.  Eureka 客户端开 ...

  9. 微服务Spring Cloud Eureka 服务端-基本配置(eureka.server.xxx)

    2019独角兽企业重金招聘Python工程师标准>>> Eureka配置系列: Eureka 客户端-基本配置 https://my.oschina.net/langxSpirit/ ...

最新文章

  1. ACMNO.43 C语言-成绩排序 利用结构体解决,是一个进步啦!
  2. seaborn可视化条形图并按照降序排序条形图进行可视化Sort Bars in Barplot in Descending Order in Python
  3. 如何限制oracle数据库表的输出记录条数
  4. 列表(list)、元组(tuple)、字典(dictionary)、array(数组)-numpy、DataFrame-pandas 、集合(set)...
  5. 端到端训练 联合训练_曲靖两家银行举行联合军事拓展训练 献礼祖国71周年华诞...
  6. 【CV秋季划】图像质量提升与编辑有哪些研究和应用,如何循序渐进地学习好?...
  7. 【渝粤教育】电大中专成本会计_1作业 题库
  8. 浅谈渗透测试之前期信息搜集
  9. python时间加一个月减一个月_python – 从pandas中的datetime列减去一年
  10. 如何在Linux操作系统查看文件创建时间
  11. 如何高效的使用Google
  12. 基于单片机的银行排队叫号系统的设计
  13. DEDECMS验证码不正确及其不显示问题
  14. 冬瓜哥直播:小白一小时掌握机器学习底层原理
  15. 手机**##,试试看
  16. 我的世界服务器物品在护甲,我的世界满级护甲指令,我的世界召唤盔甲架指令...
  17. 《计算机视觉与图像处理》最全总结之就业必备-小白易懂易上手
  18. 前端FormData详解
  19. 福大计算机专硕扩招,【今日话题】福州大学数计学院研究生又又又扩招?什么是拟录取补录?...
  20. linksys 打印软件_如何使用Linksys Smart Wi-Fi设置媒体和文件服务器

热门文章

  1. 全景丨0基础学习VR全景制作,平台篇第15章:热点功能-音图文
  2. 杨森翔书法;晨课【爨体练习】
  3. 杨森翔的书法作品【行书、爨体、隶书等作品】
  4. 电压抬升op电路分析
  5. executeUpdate()与executeQuery()的使用
  6. “非工作总结”之快门—我的镜头见过你
  7. 关于封装echarts的那些事
  8. 毕业三年,从P5到P7
  9. 你还在用笨重的 ELK?这个轻量级日志新贵了解一下
  10. GEO数据下载分析(SRA、SRR、GEM、SRX、SAMN、SRS、SRP、PRJNA全面解析)