3.1.启动

3.1.1.启动namesrv
nohup sh mqnamesrv > /dev/null 2>&1 &

3.1.2.修改broker.conf
vim …/conf/broker.conf
最下面添加两行

namesrvAddr = 192.168.5.128:9876
brokerIP1 = 192.168.5.128

Ip是虚拟机的ip

3.1.3.启动broker
nohup sh mqbroker -n 192.168.5.128:9876 autoCreateTopicEnable=true -c …/conf/broker.conf /dev/null 2>&1 &

只有这样启动,外网才能正常收发消息。

3.2.项目

3.2.1.配置文件
3.2.1.1.Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.study</groupId><artifactId>rocketmq-boot</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-parent</artifactId><version>1.5.3.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- RocketMq客户端相关依赖 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.1.0-incubating</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>4.1.0-incubating</version></dependency></dependencies>
</project>

3.2.1.2.application.properties

# 消费者的组名
apache.rocketmq.consumerGroup=consumerGroup# 生产者的组名
apache.rocketmq.producerGroup=producerGroup# NameServer地址
apache.rocketmq.namesrvAddr=192.168.5.128:9876

3.2.2.Java文件
读取属性

package com.study.rocketmq.boot.config.property;import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** Created by Administrator on 2018/4/27.*/
@Component
@ConfigurationProperties(prefix = "apache.rocketmq")
public class RocketmqProperty {private String consumerGroup;private String producerGroup;private String namesrvAddr;public String getConsumerGroup() {return consumerGroup;}public void setConsumerGroup(String consumerGroup) {this.consumerGroup = consumerGroup;}public String getProducerGroup() {return producerGroup;}public void setProducerGroup(String producerGroup) {this.producerGroup = producerGroup;}public String getNamesrvAddr() {return namesrvAddr;}public void setNamesrvAddr(String namesrvAddr) {this.namesrvAddr = namesrvAddr;}
}

常量

package com.study.rocketmq.boot.util;/*** Created by Administrator on 2018/4/27.*/
public class ConstantUtil {public static final String PUSH_TOPIC = "PushTopic";    //topicpublic static final String TAG = "push";    //tag}

发送

package com.study.rocketmq.boot.service;import com.study.rocketmq.boot.config.property.RocketmqProperty;
import com.study.rocketmq.boot.util.ConstantUtil;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;/*** Created by Administrator on 2018/4/27.*/
@Service
public class Producer {@Autowiredprivate RocketmqProperty rocketmqProperty;@PostConstructprivate void init() {//生产者的组名DefaultMQProducer producer = new DefaultMQProducer(rocketmqProperty.getProducerGroup());//指定NameServer地址,多个地址以 ; 隔开producer.setNamesrvAddr(rocketmqProperty.getNamesrvAddr());try {/*** Producer对象在使用之前必须要调用start初始化,初始化一次即可* 注意:切记不可以在每次发送消息时,都调用start方法*/producer.start();System.out.println("producer started");for (int i = 0; i < 3; i++) {String messageBody = "我是消息内容:" + i;String message = new String(messageBody.getBytes(), "utf-8");//构建消息Message msg = new Message(ConstantUtil.PUSH_TOPIC //PushTopic,ConstantUtil.TAG //Tag,"key_" + i //keys,message.getBytes() //body);//发送消息SendResult result = producer.send(msg);System.out.println("send success. MsgId = " + result.getMsgId() +", status = " + result.getSendStatus());}} catch (Exception e) {e.printStackTrace();} finally {producer.shutdown();}}
}

接收

package com.study.rocketmq.boot.service;import com.study.rocketmq.boot.config.property.RocketmqProperty;
import com.study.rocketmq.boot.util.ConstantUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;/*** Created by Administrator on 2018/4/27.*/
@Service
public class Consumer {@Autowiredprivate RocketmqProperty rocketmqProperty;@PostConstructprivate void init() {//消费者的组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rocketmqProperty.getConsumerGroup());//指定NameServer地址,多个地址以 ; 隔开consumer.setNamesrvAddr(rocketmqProperty.getNamesrvAddr());try {//订阅PushTopic下Tag为push的消息consumer.subscribe(ConstantUtil.PUSH_TOPIC, ConstantUtil.TAG);//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费//如果非第一次启动,那么按照上次消费的位置继续消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt :list) {System.out.println("messageExt = " + messageExt);String messageBody = new String(messageExt.getBody(), "utf-8");System.out.println("consumer MsgId = " + messageExt.getMsgId() + ", msgBody = " + messageBody);}} catch (UnsupportedEncodingException e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("consumer started");} catch (Exception e) {e.printStackTrace();}}
}

启动类

package com.study.rocketmq.boot;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** Created by Administrator on 2018/4/27.*/
@SpringBootApplication
public class RocketmqBootMain {public static void main(String[] args) {SpringApplication.run(RocketmqBootMain.class, args);}
}

3.2.3.成功

消费消息,是乱序的。

consumer started
producer started
send success. MsgId = C0A803CA2AFC18B4AAC2892531CC0000, status = SEND_OK
send success. MsgId = C0A803CA2AFC18B4AAC2892531E10001, status = SEND_OK
send success. MsgId = C0A803CA2AFC18B4AAC2892531E50002, status = SEND_OK
messageExt = MessageExt [queueId=2, storeSize=193, queueOffset=1, sysFlag=0, bornTimestamp=1524812916173, bornHost=/192.168.5.1:58664, storeTimestamp=1524812915586, storeHost=/192.168.5.128:10911, msgId=C0A8058000002A9F000000000002AAB6, commitLogOffset=174774, bodyCRC=126356130, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=key_0, CONSUME_START_TIME=1524812916246, UNIQ_KEY=C0A803CA2AFC18B4AAC2892531CC0000, WAIT=true, TAGS=push}, body=20]]
consumer MsgId = C0A803CA2AFC18B4AAC2892531CC0000, msgBody = 我是消息内容:0
messageExt = MessageExt [queueId=0, storeSize=193, queueOffset=0, sysFlag=0, bornTimestamp=1524812916197, bornHost=/192.168.5.1:58664, storeTimestamp=1524812915604, storeHost=/192.168.5.128:10911, msgId=C0A8058000002A9F000000000002AC38, commitLogOffset=175160, bodyCRC=1770417038, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=key_2, CONSUME_START_TIME=1524812916246, UNIQ_KEY=C0A803CA2AFC18B4AAC2892531E50002, WAIT=true, TAGS=push}, body=20]]
consumer MsgId = C0A803CA2AFC18B4AAC2892531E50002, msgBody = 我是消息内容:2
messageExt = MessageExt [queueId=3, storeSize=193, queueOffset=1, sysFlag=0, bornTimestamp=1524812916193, bornHost=/192.168.5.1:58664, storeTimestamp=1524812915599, storeHost=/192.168.5.128:10911, msgId=C0A8058000002A9F000000000002AB77, commitLogOffset=174967, bodyCRC=1888434740, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=key_1, CONSUME_START_TIME=1524812916245, UNIQ_KEY=C0A803CA2AFC18B4AAC2892531E10001, WAIT=true, TAGS=push}, body=20]]
consumer MsgId = C0A803CA2AFC18B4AAC2892531E10001, msgBody = 我是消息内容:1
2018-04-27 15:08:36.393  INFO 11004 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2018-04-27 15:08:36.411  INFO 11004 --- [           main] c.study.rocketmq.boot.RocketmqBootMain   : Started RocketmqBootMain in 4.215 seconds (JVM running for 4.797)

rocketmq集成boot相关推荐

  1. RocketMQ集成SpringBoot

    RocketMQ集成SpringBoot RocketMQ总体架构 RocketMQ基本特性

  2. 微服务 Spring Cloud Alibaba 项目搭建(七、RocketMQ 集成)

    RocketMQ介绍 RocketMQ 是一个 队列模型 的消息中间件,具有高性能.高可靠.高实时.分布式 的特点.它是一个采用 Java 语言开发的分布式的消息系统,由阿里巴巴团队开发,在2016年 ...

  3. RocketMQ 源码分析 —— 集成 Spring Boot

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

  4. 芋道 Spring Boot 消息队列 RocketMQ 入门

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

  5. 云原生消息、事件、流超融合平台——RocketMQ 5.0 初探

    简介:今天分享的主题是云原生消息事件流超融合平台 RocketMQ 5.0 初探,内容主要分为三个部分: 首先,带大家回顾业务消息领域首选 RocketMQ 4 发展历史以及 4.x 版本的演进与发展 ...

  6. RocketMQ保姆级教程

    上周花了一点时间从头到尾.从无到有地搭建了一套RocketMQ的环境,觉得还挺easy的,所以就写篇文章分享给大家. 整篇文章可以大致分为三个部分,第一部分属于一些核心概念和工作流程的讲解:第二部分就 ...

  7. 【十万字的SpringCloud,你不来看看】

    黑马笔记 目录 微服务和springcloud介绍 [1]服务集群 [2]技术导览 [3]认识微服务 1.单体架构 2.分布式架构 3.微服务是什么   解决分布式架构的缺点 [4]国内知名微服务治理 ...

  8. SaaS 系统架构,租户数据隔离模式与租户信息解析方案!

    这段时候在准备从零开始做一套SaaS系统,之前的经验都是开发单数据库系统并没有接触过SaaS系统,所以接到这个任务的时候也有也些头疼,不过办法部比困难多,难得的机会. 在网上找了很多关于SaaS的资料 ...

  9. Spring boot 集成rocketMQ 官方文档

    RocketMQ-Spring 原文地址:https://github.com/apache/rocketmq-spring/blob/master/README_zh_CN.md 帮助开发者在Spr ...

最新文章

  1. 安利一个超好用的 Pandas 数据挖掘分析神器
  2. ionic开发中页面跳转隐藏底部Ttab
  3. Docker-compose实战——Django+PostgreSQL
  4. 斯坦福华人教授:声波、光波,其实都是RNN!机器学习模型对应
  5. 包裹遭联邦快递不正常“转运” 华为:将重新审视双方合作关系
  6. 无服务器TOP3大关键问题及解决方案
  7. 1000并发的系统服务器配置,1000人并发服务器配置
  8. Atitit 未来 技术趋势 没落技术 attilax著 艾龙 总结 1. 2018技术趋势 2 1.1. 人工智能与区块链 2 1.2. 2、 PWA 或将大热 2 1.3. 5、
  9. python获取当前时间戳_Python获取时间戳代码实例
  10. 隐藏软键盘与弹窗总结
  11. python参考文献期刊格式_论文参考文献格式
  12. stc15流水灯c语言,STC89C52单片机流水灯
  13. Windows Server 2012远程默认端口3389的修改
  14. 【明解C语言】选择语句之switch
  15. 中美知识产权博弈:保护力度标准成最大分歧
  16. 【推荐+转摘】如何又快又好的做出一份优质PPT
  17. 【中级软考】cache是什么?(高速缓冲存储器)
  18. webpack开发配置API代理proxy,解决跨域问题
  19. CentOS7.4安装NVIDIA显卡驱动 GT730
  20. xp系统访问共享服务器提示无网络路径,XP提示“无任何网络提供程序接受指定的网络路径”如何解决...

热门文章

  1. 没有上市的股权和股票有啥区别
  2. 女生天天和我微信语音5小时以上,突然没有联系,应该怎么办?
  3. 朋友圈最忌讳发什么?
  4. 文案一方面需要创意,但一方面不需要过分沉溺于创意
  5. 新零售时代招商的新鲜玩法——用全网联动 促销活动来招商
  6. 成功唯一的通道就是必须迷上你所做的事
  7. 想赚钱,需要脑袋能开窍
  8. 现在很多富人有钱了,就喜欢去付费学习
  9. 晚上睡觉的时候应该把wifi关掉吗?
  10. 由遍历序列构造二叉树