windows下RocketMQ安装部署

1.系统

Windows

2. 环境

JDK1.8、Maven、Git

二. RocketMQ部署

1.下载

1.1地址:http://rocketmq.apache.org/release_notes/release-notes-4.2.0/

1.2选择‘Binary’进行下载

1.3解压已下载工程

2. 配置

2.1 系统环境变量配置

变量名:ROCKETMQ_HOME

变量值:MQ解压路径\MQ文件夹名

2.2  重启服务器

3. 启动

3.1 启动NAMESERVER

Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqnamesrv.cmd’,启动NAMESERVER。成功后会弹出提示框,此框勿关闭。

3.2 启动BROKER

Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,启动BROKER。成功后会弹出提示框,此框勿关闭。

三. RocketMQ插件部署

1. 下载

地址:https://github.com/apache/rocketmq-externals.git

下载完成之后,进入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夹,打开‘application.properties’进行配置。

2. 编译启动

2.1  进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成。

2.2  编译成功之后,Cmd进入‘target’文件夹,执行‘java -jar rocketmq-console-ng-1.0.0.jar’,启动‘rocketmq-console-ng-1.0.0.jar’。

3.测试

浏览器中输入‘127.0.0.1:配置端口’,成功后即可查看。

RocketMQ与SpringBoot整合

1、修改配置文件

#该应用是否启用生产者
rocketmq:producer:isOnOff: on#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示groupName: ${spring.application.name}#mq的nameserver地址namesrvAddr: 127.0.0.1:9876#消息最大长度 默认1024*4(4M)maxMessageSize: 4096#发送消息超时时间,默认3000sendMsgTimeout: 3000#发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2###consumer##该应用是否启用消费者consumer:isOnOff: ongroupName: ${spring.application.name}#mq的nameserver地址namesrvAddr: 127.0.0.1:9876#该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;topics: futaotopic~*;consumeThreadMin: 20consumeThreadMax: 64#设置一次消费消息的条数,默认为1条consumeMessageBatchMaxSize: 1reConsumerTimes: 3

2.配置生产者

package com.wx.wxjob.config;import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;/*** @program: wx-job* @description: RoceketMQ* @author: pengyd* @create: 2019-10-10 14:44**/
@Configuration
@Slf4j
public class MQProducerConfig {@Value("${rocketmq.consumer.namesrvAddr}")private String consumerNamesrvAddr;@Value("${rocketmq.consumer.groupName}")private String consumerGroupName;@Value("${rocketmq.consumer.consumeThreadMin}")private int consumeThreadMin;@Value("${rocketmq.consumer.consumeThreadMax}")private int consumeThreadMax;@Value("${rocketmq.consumer.topics}")private String topics;@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")private int consumeMessageBatchMaxSize;@Beanpublic DefaultMQPushConsumer consumer() {String topic = "test";String tag = "test";if (this.consumerGroupName.isEmpty()) {throw new RuntimeException("consumerGroupName  isEmpty ");}if (this.consumerNamesrvAddr.isEmpty()) {throw new RuntimeException("consumerNamesrvAddr  isEmpty ");}if (this.topics.isEmpty()) {throw new RuntimeException("topics  isEmpty ");}try {//DefaultMQPushConsumer DefaultMQPullConsumerDefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroupName);defaultMQPushConsumer.setNamesrvAddr(consumerNamesrvAddr);defaultMQPushConsumer.setConsumeThreadMin(consumeThreadMin);defaultMQPushConsumer.setVipChannelEnabled(false);
//        defaultMQPushConsumer.createTopic()defaultMQPushConsumer.setConsumeThreadMax(consumeThreadMax);//消费模式 集群还是广播,默认为集群(自动负载均衡)//广播消费: 消息会发给Consume Group中的每一个消费者进行消费,如果设置为广播消息会导致NOT_ONLINE异常,https://github.com/apache/rocketmq/issues/296defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);// 设置消费模型,//consumer.setMessageModel(MessageModel.CLUSTERING);// * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费// * 如果非第一次启动,那么按照上次消费的位置继续消费defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//设置一次消费消息的条数,默认为1条defaultMQPushConsumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);//订阅topicdefaultMQPushConsumer.subscribe(topic, tag);//        defaultMQPushConsumer.registerMessageListener(mqMessageListenerProcessor)defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt msg = list.get(0);System.out.println("接收到的消息为:" + new String(msg.getBody(), Charset.forName("utf-8")));if (msg.getTopic().equals(topic) && msg.getTags().equals(tag)) {//判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)//获取该消息重试次数if (msg.getReconsumeTimes() >= 3) {//消息已经重试了3次,如果不需要再次消费,则返回成功//TODO("如果重试了三次还是失败则执行对于失败的业务逻辑")log.error("消息重试消费失败:", msg);System.out.println(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);} else {//如果失败重试次数还没到三次则继续重试System.out.println(ConsumeConcurrentlyStatus.RECONSUME_LATER);}//TODO("开始正常的业务逻辑")System.out.println("开始正常的业务逻辑:"+new String(msg.getBody(), Charset.forName("utf-8")));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});defaultMQPushConsumer.start();log.info("rocketMq Consumer start success; namesrvAddr:{},groupName:{},topics:{}", consumerNamesrvAddr, consumerGroupName, topics);return defaultMQPushConsumer;} catch (Exception e) {log.error("rocketMq Consumer start fail;{}", e.getMessage(), e);return new DefaultMQPushConsumer();}}}

3.配置消费者

package com.wx.wxjob.config;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @program: wx-job* @description: RoceketMQ* @author: pengyd* @create: 2019-10-10 14:44**/
@Configuration
@Slf4j
public class MQConsumerConfig {@Value("${rocketmq.producer.groupName}")private String producerGroupName;@Value("${rocketmq.producer.namesrvAddr}")private String producerNameSrvAddr;@Value("${rocketmq.producer.maxMessageSize}")private int maxMessageSize;@Value("${rocketmq.producer.sendMsgTimeout}")private int sendMsgTimeout;@Value("${rocketmq.producer.retryTimesWhenSendFailed}")private int retryTimesWhenSendFailed;@Beanpublic DefaultMQProducer producer(){if (this.producerGroupName.isEmpty()) {throw new RuntimeException("producerGroupName  isEmpty ");}if (this.producerNameSrvAddr.isEmpty()) {throw  new RuntimeException("producerNameSrvAddr  isEmpty ");}DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroupName);defaultMQProducer.setNamesrvAddr(producerNameSrvAddr);defaultMQProducer.setMaxMessageSize(maxMessageSize);defaultMQProducer.setSendMsgTimeout(sendMsgTimeout);defaultMQProducer.setVipChannelEnabled(false);//消息发送到mq服务器失败重试次数defaultMQProducer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);try {defaultMQProducer.start();log.info("rocketMq Producer start success; nameServer:{},producerGroupName:{}", producerNameSrvAddr, producerGroupName);} catch (Exception e) {log.error("rocketMq Producer start fail;{}", e.getMessage(), e);}return defaultMQProducer;}}

4.测试controller

@RequestMapping("/succ")@ResponseBodypublic String succ() {logger.info("1111111111111111111111111111111111111");Message message = new Message("test", "test", (new Date().toString() + "这是测试mq").getBytes());try {producer.send(message);} catch (Exception e) {e.printStackTrace();}return "success";}

w10本地安装RocketMq(插件)以及整合springboot相关推荐

  1. Redis 安装配置开机启动整合SpringBoot以及配置文件详解

    安装 Redis # 下载Redis wget https://download.redis.io/releases/redis-6.0.9.tar.gz# 解压 redis tar -zxvf re ...

  2. 本地缓存Caffeine详解+整合SpringBoot的@EnableCaching

    目录 前言: Caffeine详解 加载策略 同步 异步,即多线程加载 回收策略 回收策略-数量 回收策略-权重 回收策略-时间 回收策略-软引用/弱引用 移除监听 统计 整合SpringBoot @ ...

  3. 安装谷歌插件 ~ 一招轻松解决

    在安装谷歌插件的时候,由于网络原因经常失败.今天和大家分享如何在本地安装谷歌插件,希望可以帮助到有需要的小伙伴~ 赶快跟小编一起来看看吧 第一步: 由于谷歌浏览器的插件默认是不能在本地安装的,我们需要 ...

  4. Eclipse安装SVN插件图文详解

    2019独角兽企业重金招聘Python工程师标准>>> 1.在线方式安装SVN插件 最新的Subclipse支持Eclipse 3.2及以上版本(MyEclipse 5.0+.Zen ...

  5. redis安装到熟练整合springboot(篇幅较长内容详细)

    尚硅谷课程redis学习笔记 1.在linux下安装redis数据库 这篇文章介绍安装 redis介绍: Redis是一个开源的key-value存储系统. 和Memcached类似,它支持存储的va ...

  6. 【Flutter】shared_preferences 本地存储 ( 简介 | 安装 shared_preferences 插件 | 使用 shared_preferences 流程 )

    文章目录 一.shared_preferences 本地存储插件简介 二.安装 shared_preferences 插件 三.使用 shared_preferences 流程 四.完整代码示例 五. ...

  7. 查看搜狗浏览器插件的本地安装位置

    查看搜狗浏览器插件的本地安装位置 使用命令行查看: 同时按下windows键以及R键,输入如下命令即可 %appdata%/Sogouexplorer\Extension 直接打开文件夹 插件安装的文 ...

  8. Edge导出crx插件Chrome安装本地的crx插件

    1. Edge导出crx插件 参考:Edge"打包扩展",导出并备份浏览器插件安装文件的方法 以打包密码管理器插件 Bitwarden 为例. 记住上图绿框内Bitwarden的I ...

  9. Windows达梦数据库安装及整合SpringBoot

    安装教程 前言 下载与安装 配置数据库 使用DM管理工具连接数据库并创建模式(数据库)和表 SpringBoot整合DM数据库的基本配置 后续 前言 由于公司的一个项目要实现软硬件国产化,需要用到达梦 ...

最新文章

  1. 科技部公布2017年独角兽名单,来看看有哪些人工智能企业?
  2. weka: exhaustive search
  3. TCP/IP网络中专有名词注解
  4. 彩色图像灰度化MFC
  5. golang读取pdf
  6. npoi xlsx转换html,NPOI导Excel样式设置(转)
  7. 2021-2027全球与中国射频发生器市场现状及未来发展趋势
  8. python爬虫进阶-汽车之家贴吧信息(字体反爬-动态映射)
  9. sk_buff 介绍
  10. 只要付出了努力,总会有回报的
  11. linux 4.6发布时间,Linux Kernel 4.6的第4个维护版本发布
  12. DC-DC电源输出纹波测量方法
  13. 【ironic】ironic介绍与原理
  14. java 求tan的角度_Java StrictMath tan()用法及代码示例
  15. html入门之用html给女朋友写封精致的情书--小白直接拿去用,一点难度都没有
  16. mac修改mysql端口
  17. 咸鱼洽谈(linux三剑客之一grep)美好的周五生活开始了~
  18. 如何计算机网络打印机驱动程序,如何安装打印机驱动程序,教您如何给电脑安装打印机驱动程序...
  19. Android 调用系统剪裁工具剪裁用户头像
  20. Oracle全球裁员潮:云计算成趋势?

热门文章

  1. 面试回答问题的小套路
  2. 华为手机无信号显示无服务器,华为手机插卡显示无服务怎么办
  3. 函数奇偶性运算法则,以及复合函数奇偶性判断 ln的运算法则,对数函数运算法则 对数函数运算法则口诀,简单记忆
  4. 经典脂质组学检测定量脂质组学检测-百趣生物
  5. 小米九月十五升级鸿蒙系统,小米手机鸿蒙系统申请升级刷机包-小米手机鸿蒙系统申请平台预约v1.0.0预约-七度网...
  6. 伺服电机驱动器的一些简单相关分析
  7. apap之web dynpro for abap----ALV初始化
  8. 解决ADAMS启动没有欢迎对话框的解决办法
  9. 这事真麻烦!备案网站迁移的要注意,不然网站会被判定为空壳网站
  10. Java自学之旅07