rocketmq集成boot
3.1.启动
3.1.1.启动namesrv
nohup sh mqnamesrv > /dev/null 2>&1 &
3.1.2.修改broker.conf
vim …/conf/broker.conf
最下面添加两行
namesrvAddr = 192.168.5.128:9876
brokerIP1 = 192.168.5.128
Ip是虚拟机的ip
3.1.3.启动broker
nohup sh mqbroker -n 192.168.5.128:9876 autoCreateTopicEnable=true -c …/conf/broker.conf /dev/null 2>&1 &
只有这样启动,外网才能正常收发消息。
3.2.项目
3.2.1.配置文件
3.2.1.1.Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.study</groupId><artifactId>rocketmq-boot</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-parent</artifactId><version>1.5.3.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- RocketMq客户端相关依赖 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.1.0-incubating</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>4.1.0-incubating</version></dependency></dependencies>
</project>
3.2.1.2.application.properties
# 消费者的组名
apache.rocketmq.consumerGroup=consumerGroup# 生产者的组名
apache.rocketmq.producerGroup=producerGroup# NameServer地址
apache.rocketmq.namesrvAddr=192.168.5.128:9876
3.2.2.Java文件
读取属性
package com.study.rocketmq.boot.config.property;import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** Created by Administrator on 2018/4/27.*/
@Component
@ConfigurationProperties(prefix = "apache.rocketmq")
public class RocketmqProperty {private String consumerGroup;private String producerGroup;private String namesrvAddr;public String getConsumerGroup() {return consumerGroup;}public void setConsumerGroup(String consumerGroup) {this.consumerGroup = consumerGroup;}public String getProducerGroup() {return producerGroup;}public void setProducerGroup(String producerGroup) {this.producerGroup = producerGroup;}public String getNamesrvAddr() {return namesrvAddr;}public void setNamesrvAddr(String namesrvAddr) {this.namesrvAddr = namesrvAddr;}
}
常量
package com.study.rocketmq.boot.util;/*** Created by Administrator on 2018/4/27.*/
public class ConstantUtil {public static final String PUSH_TOPIC = "PushTopic"; //topicpublic static final String TAG = "push"; //tag}
发送
package com.study.rocketmq.boot.service;import com.study.rocketmq.boot.config.property.RocketmqProperty;
import com.study.rocketmq.boot.util.ConstantUtil;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;/*** Created by Administrator on 2018/4/27.*/
@Service
public class Producer {@Autowiredprivate RocketmqProperty rocketmqProperty;@PostConstructprivate void init() {//生产者的组名DefaultMQProducer producer = new DefaultMQProducer(rocketmqProperty.getProducerGroup());//指定NameServer地址,多个地址以 ; 隔开producer.setNamesrvAddr(rocketmqProperty.getNamesrvAddr());try {/*** Producer对象在使用之前必须要调用start初始化,初始化一次即可* 注意:切记不可以在每次发送消息时,都调用start方法*/producer.start();System.out.println("producer started");for (int i = 0; i < 3; i++) {String messageBody = "我是消息内容:" + i;String message = new String(messageBody.getBytes(), "utf-8");//构建消息Message msg = new Message(ConstantUtil.PUSH_TOPIC //PushTopic,ConstantUtil.TAG //Tag,"key_" + i //keys,message.getBytes() //body);//发送消息SendResult result = producer.send(msg);System.out.println("send success. MsgId = " + result.getMsgId() +", status = " + result.getSendStatus());}} catch (Exception e) {e.printStackTrace();} finally {producer.shutdown();}}
}
接收
package com.study.rocketmq.boot.service;import com.study.rocketmq.boot.config.property.RocketmqProperty;
import com.study.rocketmq.boot.util.ConstantUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;/*** Created by Administrator on 2018/4/27.*/
@Service
public class Consumer {@Autowiredprivate RocketmqProperty rocketmqProperty;@PostConstructprivate void init() {//消费者的组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rocketmqProperty.getConsumerGroup());//指定NameServer地址,多个地址以 ; 隔开consumer.setNamesrvAddr(rocketmqProperty.getNamesrvAddr());try {//订阅PushTopic下Tag为push的消息consumer.subscribe(ConstantUtil.PUSH_TOPIC, ConstantUtil.TAG);//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费//如果非第一次启动,那么按照上次消费的位置继续消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt messageExt :list) {System.out.println("messageExt = " + messageExt);String messageBody = new String(messageExt.getBody(), "utf-8");System.out.println("consumer MsgId = " + messageExt.getMsgId() + ", msgBody = " + messageBody);}} catch (UnsupportedEncodingException e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("consumer started");} catch (Exception e) {e.printStackTrace();}}
}
启动类
package com.study.rocketmq.boot;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** Created by Administrator on 2018/4/27.*/
@SpringBootApplication
public class RocketmqBootMain {public static void main(String[] args) {SpringApplication.run(RocketmqBootMain.class, args);}
}
3.2.3.成功
消费消息,是乱序的。
consumer started
producer started
send success. MsgId = C0A803CA2AFC18B4AAC2892531CC0000, status = SEND_OK
send success. MsgId = C0A803CA2AFC18B4AAC2892531E10001, status = SEND_OK
send success. MsgId = C0A803CA2AFC18B4AAC2892531E50002, status = SEND_OK
messageExt = MessageExt [queueId=2, storeSize=193, queueOffset=1, sysFlag=0, bornTimestamp=1524812916173, bornHost=/192.168.5.1:58664, storeTimestamp=1524812915586, storeHost=/192.168.5.128:10911, msgId=C0A8058000002A9F000000000002AAB6, commitLogOffset=174774, bodyCRC=126356130, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=key_0, CONSUME_START_TIME=1524812916246, UNIQ_KEY=C0A803CA2AFC18B4AAC2892531CC0000, WAIT=true, TAGS=push}, body=20]]
consumer MsgId = C0A803CA2AFC18B4AAC2892531CC0000, msgBody = 我是消息内容:0
messageExt = MessageExt [queueId=0, storeSize=193, queueOffset=0, sysFlag=0, bornTimestamp=1524812916197, bornHost=/192.168.5.1:58664, storeTimestamp=1524812915604, storeHost=/192.168.5.128:10911, msgId=C0A8058000002A9F000000000002AC38, commitLogOffset=175160, bodyCRC=1770417038, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=key_2, CONSUME_START_TIME=1524812916246, UNIQ_KEY=C0A803CA2AFC18B4AAC2892531E50002, WAIT=true, TAGS=push}, body=20]]
consumer MsgId = C0A803CA2AFC18B4AAC2892531E50002, msgBody = 我是消息内容:2
messageExt = MessageExt [queueId=3, storeSize=193, queueOffset=1, sysFlag=0, bornTimestamp=1524812916193, bornHost=/192.168.5.1:58664, storeTimestamp=1524812915599, storeHost=/192.168.5.128:10911, msgId=C0A8058000002A9F000000000002AB77, commitLogOffset=174967, bodyCRC=1888434740, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=key_1, CONSUME_START_TIME=1524812916245, UNIQ_KEY=C0A803CA2AFC18B4AAC2892531E10001, WAIT=true, TAGS=push}, body=20]]
consumer MsgId = C0A803CA2AFC18B4AAC2892531E10001, msgBody = 我是消息内容:1
2018-04-27 15:08:36.393 INFO 11004 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2018-04-27 15:08:36.411 INFO 11004 --- [ main] c.study.rocketmq.boot.RocketmqBootMain : Started RocketmqBootMain in 4.215 seconds (JVM running for 4.797)
rocketmq集成boot相关推荐
- RocketMQ集成SpringBoot
RocketMQ集成SpringBoot RocketMQ总体架构 RocketMQ基本特性
- 微服务 Spring Cloud Alibaba 项目搭建(七、RocketMQ 集成)
RocketMQ介绍 RocketMQ 是一个 队列模型 的消息中间件,具有高性能.高可靠.高实时.分布式 的特点.它是一个采用 Java 语言开发的分布式的消息系统,由阿里巴巴团队开发,在2016年 ...
- RocketMQ 源码分析 —— 集成 Spring Boot
点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...
- 芋道 Spring Boot 消息队列 RocketMQ 入门
点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...
- 云原生消息、事件、流超融合平台——RocketMQ 5.0 初探
简介:今天分享的主题是云原生消息事件流超融合平台 RocketMQ 5.0 初探,内容主要分为三个部分: 首先,带大家回顾业务消息领域首选 RocketMQ 4 发展历史以及 4.x 版本的演进与发展 ...
- RocketMQ保姆级教程
上周花了一点时间从头到尾.从无到有地搭建了一套RocketMQ的环境,觉得还挺easy的,所以就写篇文章分享给大家. 整篇文章可以大致分为三个部分,第一部分属于一些核心概念和工作流程的讲解:第二部分就 ...
- 【十万字的SpringCloud,你不来看看】
黑马笔记 目录 微服务和springcloud介绍 [1]服务集群 [2]技术导览 [3]认识微服务 1.单体架构 2.分布式架构 3.微服务是什么 解决分布式架构的缺点 [4]国内知名微服务治理 ...
- SaaS 系统架构,租户数据隔离模式与租户信息解析方案!
这段时候在准备从零开始做一套SaaS系统,之前的经验都是开发单数据库系统并没有接触过SaaS系统,所以接到这个任务的时候也有也些头疼,不过办法部比困难多,难得的机会. 在网上找了很多关于SaaS的资料 ...
- Spring boot 集成rocketMQ 官方文档
RocketMQ-Spring 原文地址:https://github.com/apache/rocketmq-spring/blob/master/README_zh_CN.md 帮助开发者在Spr ...
最新文章
- 安利一个超好用的 Pandas 数据挖掘分析神器
- ionic开发中页面跳转隐藏底部Ttab
- Docker-compose实战——Django+PostgreSQL
- 斯坦福华人教授:声波、光波,其实都是RNN!机器学习模型对应
- 包裹遭联邦快递不正常“转运” 华为:将重新审视双方合作关系
- 无服务器TOP3大关键问题及解决方案
- 1000并发的系统服务器配置,1000人并发服务器配置
- Atitit 未来 技术趋势 没落技术 attilax著 艾龙 总结 1. 2018技术趋势	2 1.1. 人工智能与区块链	2 1.2. 2、 PWA 或将大热	2 1.3. 5、
- python获取当前时间戳_Python获取时间戳代码实例
- 隐藏软键盘与弹窗总结
- python参考文献期刊格式_论文参考文献格式
- stc15流水灯c语言,STC89C52单片机流水灯
- Windows Server 2012远程默认端口3389的修改
- 【明解C语言】选择语句之switch
- 中美知识产权博弈:保护力度标准成最大分歧
- 【推荐+转摘】如何又快又好的做出一份优质PPT
- 【中级软考】cache是什么?(高速缓冲存储器)
- webpack开发配置API代理proxy,解决跨域问题
- CentOS7.4安装NVIDIA显卡驱动 GT730
- xp系统访问共享服务器提示无网络路径,XP提示“无任何网络提供程序接受指定的网络路径”如何解决...