本文所介绍环境为win7环境下运行, 从官方github中(https://github.com/alibaba/RocketMQ)下载RocketMQ-master.zip,版本为v3.5.8,解压并进入根目录,运行命令install.bat, 安装完成后进入目录\target\alibaba-rocketmq-broker\alibaba-rocketmq\bin,打开两个命令行窗口,分别使用以下命令启动rocketmq

启动nameserver
mqnamesrv.exe
启动broker
mqbroker -n 127.0.0.1:9876

1、编写pom.xml,

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.hode</groupId><artifactId>rocketmq</artifactId><version>0.0.1-SNAPSHOT</version><properties><spring.version>4.3.2.RELEASE</spring.version><junit.version>4.12</junit.version><log4j.version>1.2.17</log4j.version><rocketmq.version>3.2.6</rocketmq.version><slf4j.version>1.7.12</slf4j.version></properties><dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring.version}</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>com.alibaba.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>${junit.version}</version><scope>test</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>${spring.version}</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build></project>

2、编写spring配置文件applicationContext-consumer.xml,applicationContext-producer.xml以及log4j.properties,内容如下

applicationContext-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"><bean id="producer" class="com.hode.rocketmq.Consumer" init-method="init" destroy-method="destroy"><constructor-arg name="consumerGroup" value="rocketmq-test" /><constructor-arg name="namesrvAddr" value="127.0.0.1:9876" /><constructor-arg name="instanceName" value="test" /><constructor-arg name="topic" value="testTopic" /><constructor-arg name="messageListener" ref="messageListener" /></bean><bean id="messageListener" class="com.hode.rocketmq.StringMessageListener" /></beans>

applicationContext-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"><bean id="producer" class="com.hode.rocketmq.Producer" init-method="init" destroy-method="destroy"><constructor-arg name="producerGroup" value="rocketmq-test" /><constructor-arg name="namesrvAddr" value="127.0.0.1:9876" /><constructor-arg name="instanceName" value="test" /></bean></beans>

log4j.properties

log4j.rootLogger=INFO,Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%-4r %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] %-5p %c %x - %m%nlog4j.logger.com.hode=DEBUG

3、编写类StringMessageListener.java,Producer.java,Consumer.java

StringMessageListener.java

import java.util.List;import org.apache.log4j.Logger;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;public class StringMessageListener implements MessageListenerConcurrently{private Logger log = Logger.getLogger(getClass());@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {log.info("msg : " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}

Producer.java

import org.apache.log4j.Logger;import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;/*** 生产*/
public class Producer {protected Logger log = Logger.getLogger(getClass());private String producerGroup;private String namesrvAddr;private String instanceName;private DefaultMQProducer producer;public DefaultMQProducer getProducer() {return producer;}public Producer(String producerGroup,String namesrvAddr,String instanceName){this.producerGroup = producerGroup;this.namesrvAddr = namesrvAddr;this.instanceName = instanceName;}public void init() throws MQClientException{log.info("start init DefaultMQProducer...");producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(namesrvAddr);producer.setInstanceName(instanceName);producer.start();log.info("DefaultMQProducer init success.");}public void destroy(){log.info("start destroy DefaultMQProducer...");producer.shutdown();log.info("DefaultMQProducer destroy success.");}}

Consumer.java

import org.apache.log4j.Logger;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;public class Consumer {private Logger log = Logger.getLogger(getClass());private DefaultMQPushConsumer consumer;private String consumerGroup;private String namesrvAddr;private String instanceName;private String topic;private MessageListenerConcurrently messageListener;public Consumer(String consumerGroup,String namesrvAddr,String instanceName,String topic,MessageListenerConcurrently messageListener){this.consumerGroup = consumerGroup;this.namesrvAddr = namesrvAddr;this.instanceName = instanceName;this.topic = topic;this.messageListener = messageListener;}public void init() throws Exception{log.info("start init DefaultMQPushConsumer...");consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //从队列头部开始消费consumer.setNamesrvAddr(namesrvAddr);consumer.setInstanceName(instanceName);consumer.subscribe(topic, "*");consumer.registerMessageListener(messageListener);consumer.start();log.info("DefaultMQPushConsumer init ok.");}public void destroy(){log.info("start destroy DefaultMQPushConsumer...");consumer.shutdown();log.info("DefaultMQPushConsumer destroy success.");}public DefaultMQPushConsumer getConsumer() {return consumer;}}

4、编写测试类

ProducerTest.java

import java.util.Date;import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;public class ProducerTest {private static Logger log = Logger.getLogger(ProducerTest.class);private static ApplicationContext context;public static void main(String[] args) throws Exception{context = new ClassPathXmlApplicationContext("classpath:applicationContext-producer.xml");Producer producer = context.getBean(Producer.class);DefaultMQProducer p = producer.getProducer();String message = "test messgae"+new Date();Message msg = new Message("testTopic",message.getBytes());log.info(message);p.send(msg, new SendCallback(){@Overridepublic void onSuccess(SendResult sendResult) {log.info(sendResult.getSendStatus().name());log.info("onSuccess");producer.destroy();}@Overridepublic void onException(Throwable e) {log.error("onException");}});}}

ConsumerTest.java

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class ConsumerTest {private static ApplicationContext context;public static void main(String[] args) throws Exception{context = new ClassPathXmlApplicationContext("classpath:applicationContext-consumer.xml");Consumer consumer = context.getBean(Consumer.class);Thread.sleep(20*1000);System.out.println("end");consumer.destroy();}}

分别运行生产端及消费端完成测试,结束。

转载于:https://www.cnblogs.com/alterem/p/11249001.html

Spring + RocketMQ使用相关推荐

  1. 如何在优雅地Spring 中实现消息的发送和消费

    本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个 ...

  2. 讲讲我和Spring创始级程序员共同review代码的故事

    RocketMQ-Spring毕业了. 作为Apache RocketMQ的子项目,经过6个多月的孵化,RocketMQ-Spring发布了第一个Release版本v2.0.1,通过使用Spring ...

  3. SpringBoot整合RocketMQ之环境搭建以及Producer发送消息

    https://github.com/apache/rocketmq-spring/releases/tag/2.0.0https://github.com/apache/rocketmq-sprin ...

  4. 如何在优雅地Spring 中实现消息的发送和消费 1

    本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个 ...

  5. 消息队列:SpringBoot集成RocketMQ的那些坑(真实有效、附源码)

    技术不更新的坑 本着一颗爱自由.爱技术的心,不断在探索技术的路上前进,可是总是有一些坑是需要不断的去踩,去做一些改变来适应这个技术发展飞快的时代. 我用上了SpringBoot2.0+和JDK10.这 ...

  6. SpringBoot与RocketMQ客户端集成原理解读与示例

    本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个 ...

  7. RocketMQ重试策略及与Springboot整合

    注:基础请参考上一篇文章RocketMQ简介与安装及入门 1. 重试策略 在消息的发送和消费过程中,都有可能出现错误,如网络异常等,出现了错误就需要进行错误重试,这种消息的重试需要分2种,分别是pro ...

  8. 【RocketMQ】消息重试、重试次数设置、死信队列

    文章目录 1. 死信队列 1.1 死信特性 1.2 查看死信消息 2.重试次数参数 2.1 Producer端重试 2.2 Consumer端重试 3.1 异常重试 3.2 超时重试 参考 1. 死信 ...

  9. 罗美琪和春波特的故事...

    作者 | 辽天 来源 | 阿里巴巴云原生公众号 **导读:**rocketmq-spring 经过 6 个多月的孵化,作为 Apache RocketMQ 的子项目正式毕业,发布了第一个 Releas ...

最新文章

  1. std::tuple类型
  2. 【转】【CUBE】Oracle分组函数之CUBE魅力
  3. 地铁里运用计算机视觉的场景,人工智能技术在地铁运营场景应用研究报告(26页)...
  4. 拦截导弹(最长递增子序列)
  5. IBM Java垃圾回收
  6. python之路day10-命名空间和作用域、函数嵌套,作用域链、闭包
  7. 【RS】Local Latent Space Models for Top- N Recommendation-利用局部隐含空间模型进行Top-N推荐...
  8. 周鸿祎谈李国庆夫妇互撕:大事男人说了算,小事才听女人的
  9. Spring 的configuration-metadata-annotation-processor 文档
  10. java motherfree video_Java Config 下的Spring Test方式
  11. 为PyCharm添加不同解释器
  12. JCR分区与中科院分区
  13. 数独问题流程图_算法实践——数独的基本解法
  14. JSP内置对象及部分常用方法
  15. java.io.IOException: Unable to establish loopback connection
  16. 用python画一个动态樱花
  17. Invalid Component definition:header
  18. POJ3737UmBasketella
  19. Oracle 错误总结及解决方法
  20. 四级单词pdf_2017年6月大学英语四级真题及答案解析(完整三套可打印)

热门文章

  1. Android小知识10则(上)(2018.8重编版)
  2. Python:numpy数组转换为json格式
  3. 【转】简单的解释XSS攻击
  4. glibc库详解及与POSIX,system V这些库之间关系的说明
  5. 了解自己计算机硬件设备信息
  6. python 面对对象思维导图_Python面向对象思维导图
  7. adb interface找不到驱动程序_Windows 10现支持更多设备的驱动程序更新
  8. 计算机英语四六级考试时间,英语四六级口语考试时间
  9. 5-8经典卷子神经网络结构介绍
  10. 数据科学入门与实战:玩转pandas实战项目分析航班晚点情况