SpringMQ的使用
文章目录
- SpringMQ的使用
- 1、windows安装
- 2、添加依赖:
- 3、增加rabbit的配置:
- 4、最简单的测试:publisher--->MQ-->consumer
- 4.1、建立连接配置:
- 4.2、发送端:建立连接,获取通道,创建队列,准备消息,发送消息到队列:
- 4.3 、接收端:建立连接,获取通道,声明队列,申请队列的一个消费者(内含监听消息的方法),在通道线路上接收消息。
- 4.4 消息接收的手动确认:
- 5、work消息模型
- 5.1 消息发送:
- 5.2 消息接收:
- 6、订阅模型分类:
- 6.1 fanout:也就是交换机广播消息
- 6.1.1 发送消息:建立连接,获取通道,声明交换机,发送消息到交换机
- 6.1.2 接受消息:建立连接,获取通道,声明队列,绑定到交换机,定义消费者(包含监听),监听通道。
- 6.2 direct:相当于定向投放
- 6.2.1 发送消息: 交换机类型指定为direct,发送消息时指定routing key
- 6.2.2 接收消息: 获取通道,声明队列,绑定交换机,指定routing key
- 6.3 Topic:交换机采用通配符的方式和队列匹配
- 6.3.1 发送消息: 指定交换机类型,发送消息携带route key
- 6.3.2 接收消息: 声明队列,绑定交换机携带带通配符的route key
- 7、持久化
- 7.1 交换机持久化
- 7.2 队列持久化
- 7.3 消息持久化
- 8、spring和MQ整合
- 8.1 设置接收消息的监听器: 通过注解@RabbitListener实现,绑定队列,交换机,和route-key
- 8.2接收消息:通过AmqpTemplate实现
SpringMQ的使用
1、windows安装
安装erlang,配置环境变量,安装rabbitmq-server
访问:http://localhost:15672/
2、添加依赖:
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
3、增加rabbit的配置:
spring:rabbitmq:host: 127.0.0.1username: rootpassword: 880808virtual-host: /
4、最简单的测试:publisher—>MQ–>consumer
4.1、建立连接配置:
public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("127.0.0.1");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("12345678");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}}
4.2、发送端:建立连接,获取通道,创建队列,准备消息,发送消息到队列:
public class Send {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道,使用通道才能完成消息相关的操作Channel channel = connection.createChannel();// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 向指定的队列中发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//关闭通道和连接channel.close();connection.close();}
}
4.3 、接收端:建立连接,获取通道,声明队列,申请队列的一个消费者(内含监听消息的方法),在通道线路上接收消息。
public class Recv {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [x] received : " + msg + "!");}};// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);}
}
4.4 消息接收的手动确认:
DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [x] received : " + msg + "!");// 手动进行ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列,第二个参数false,手动进行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);
5、work消息模型
5.1 消息发送:
public class Send {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i = 0; i < 50; i++) {// 消息内容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 2);}// 关闭通道和连接channel.close();connection.close();}
}
5.2 消息接收:
6、订阅模型分类:
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的
X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange类型有以下几种:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
我们这里先学习
Fanout:即广播模式
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
6.1 fanout:也就是交换机广播消息
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个消费者
- 2) 每个消费者有自己的queue(队列)
- 3) 每个队列都要绑定到Exchange(交换机)
- 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 5) 交换机把消息发送给绑定过的所有队列
- 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
6.1.1 发送消息:建立连接,获取通道,声明交换机,发送消息到交换机
public class Send {private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello everyone";// 发布消息到Exchangechannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [生产者] Sent '" + message + "'");channel.close();connection.close();}
}
6.1.2 接受消息:建立连接,获取通道,声明队列,绑定到交换机,定义消费者(包含监听),监听通道。
//消费者1
public class Recv {private final static String QUEUE_NAME = "fanout_exchange_queue_1";private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");}};// 监听队列,自动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}
6.2 direct:相当于定向投放
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
6.2.1 发送消息: 交换机类型指定为direct,发送消息时指定routing key
发送消息的RoutingKey分别是:insert、update、delete
public class Send {private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "商品新增了, id = 1001";// 发送消息,并且指定routing key 为:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());System.out.println(" [商品服务:] Sent '" + message + "'");channel.close();connection.close();}
}
6.2.2 接收消息: 获取通道,声明队列,绑定交换机,指定routing key
public class Recv {private final static String QUEUE_NAME = "direct_exchange_queue_1";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
6.3 Topic:交换机采用通配符的方式和队列匹配
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
6.3.1 发送消息: 指定交换机类型,发送消息携带route key
public class Send {private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为topicchannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息内容String message = "新增商品 : id = 1001";// 发送消息,并且指定routing key 为:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());System.out.println(" [商品服务:] Sent '" + message + "'");channel.close();connection.close();}
}
6.3.2 接收消息: 声明队列,绑定交换机携带带通配符的route key
public class Recv2 {private final static String QUEUE_NAME = "topic_exchange_queue_2";private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
7、持久化
7.1 交换机持久化
7.2 队列持久化
7.3 消息持久化
8、spring和MQ整合
8.1 设置接收消息的监听器: 通过注解@RabbitListener实现,绑定队列,交换机,和route-key
@Component
public class Listener {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spring.test.queue", durable = "true"),exchange = @Exchange(value = "spring.test.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"#.#"}))public void listen(String msg){System.out.println("接收到消息:" + msg);}}
8.2接收消息:通过AmqpTemplate实现
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo {@Autowiredprivate AmqpTemplate amqpTemplate;@Testpublic void testSend() throws InterruptedException {String msg = "hello, Spring boot amqp";this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);// 等待10秒后再结束Thread.sleep(10000);}
}
SpringMQ的使用相关推荐
- java 监听队列_spring+activemq实战之配置监听多队列实现不同队列消息消费
摘选:https://my.oschina.net/u/3613230/blog/1457227 摘要: 最近在项目开发中,需要用到activemq,用的时候,发现在同一个项目中point-to-po ...
- spring整合使用activemq
activemq简单的demo这里就不做演示了,今天介绍一下如何利用spring整合activemq,也是实际工作中涉及到的,希望对各位伙伴有所帮助, 1.安装activemq,为演示方便,我已经提前 ...
- ActiveMQ死信产生的原因及使用方案
DLQ-死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息. 出现以下情况时,消息会被redelivered A transacted session is used and ...
- ActiveMQ的简单使用
转:http://wosyingjun.iteye.com/blog/2314681 ActiveMQ的简单使用 ActiveMQ是一种开源的,实现了JMS规范的,面向消息(MOM)的中间件,为应用程 ...
- sheng的学习笔记-activeMQ框架原理
目录 搭建环境 如果需要修改访问端口的话 如果需要修改用户名和密码的话 重启 ActiveMQ 配置文件 activemq.xml 基础理论: 什么是 JMS JMS基本开发步骤: 点对点模型(Poi ...
最新文章
- 聪明人的游戏(3)镜子颠倒了什么?
- python自学需要多久-自学Python多久能找到工作
- c语言程序构造数据类型问题,C语言程序设计课程课件之第四章简单构造数据类型.ppt...
- Visual Studio 2008 断点调试直接跳出代码窗口
- mysql8导出文件_windows下 Mysql 8.0.x 数据库简单的导出和导入!!!
- 原生js 基于canvas写一个简单的前端 截图工具
- 新冠隔离让你家Wifi变慢?全球网络大塞车AI缓拥堵,边缘计算或成杀手锏
- java word另存为_Java 网页html转为word并保存为doc文件
- mysql测试什么鬼,where 1=1 是什么鬼?SQL中有这玩意?
- APP 上传之后出现invalid binary 问题解决汇总
- 用組件封裝數據庫操作(一)
- 20155338《网络对抗》Web安全基础实践
- 银河麒麟系统查看网络设置命令_银河麒麟配置说明
- 今日头条信息流 - 基础账户实操
- 材料工程计算机技术应用,计算机在材料工程中的应用
- iOS 强制屏幕实现旋转功能
- 2021高考厦门科技中学成绩查询,2021年厦门重点高中名单及排名,厦门高中高考成绩排名榜...
- linux shell题库,shell习题-30
- 错误SyntaxError: (unicode error) truncated \UXXXXXXXX escape的解决方法
- ArtWork.Conversion.
热门文章
- 关于TP框架的微信开发服务器配置TOKEN验证失败解决方案
- 拷贝依赖_还不懂零拷贝(Zero-Copy)?怎么称得上高级程序员
- 安徽大学计算机教学平台c语言作业,安徽大学计算机基础C语言选择题
- sql a 表 若包含b表 则a 表 列显示_几道常见的SQL面试题,看你能答对几道?
- java课程总结_java课程总结报告.doc
- zabbix mysql监控告警_Zabbix监控mysql配置及故障告警配置
- ajax工具怎么安装,AJAX工具
- mongodb 查询效率_2020年9个好用的MongoDB 图形化界面工具
- mysql分页查询limit_MySQL查询语句(where,group by,having,order by,limit)
- Linux环境下配置Tomat