文章目录

  • SpringMQ的使用
    • 1、windows安装
    • 2、添加依赖:
    • 3、增加rabbit的配置:
    • 4、最简单的测试:publisher--->MQ-->consumer
      • 4.1、建立连接配置:
      • 4.2、发送端:建立连接,获取通道,创建队列,准备消息,发送消息到队列:
      • 4.3 、接收端:建立连接,获取通道,声明队列,申请队列的一个消费者(内含监听消息的方法),在通道线路上接收消息。
    • 4.4 消息接收的手动确认:
    • 5、work消息模型
      • 5.1 消息发送:
      • 5.2 消息接收:
    • 6、订阅模型分类:
        • 6.1 fanout:也就是交换机广播消息
          • 6.1.1 发送消息:建立连接,获取通道,声明交换机,发送消息到交换机
          • 6.1.2 接受消息:建立连接,获取通道,声明队列,绑定到交换机,定义消费者(包含监听),监听通道。
        • 6.2 direct:相当于定向投放
          • 6.2.1 发送消息: 交换机类型指定为direct,发送消息时指定routing key
          • 6.2.2 接收消息: 获取通道,声明队列,绑定交换机,指定routing key
        • 6.3 Topic:交换机采用通配符的方式和队列匹配
          • 6.3.1 发送消息: 指定交换机类型,发送消息携带route key
          • 6.3.2 接收消息: 声明队列,绑定交换机携带带通配符的route key
      • 7、持久化
        • 7.1 交换机持久化
        • 7.2 队列持久化
        • 7.3 消息持久化
      • 8、spring和MQ整合
        • 8.1 设置接收消息的监听器: 通过注解@RabbitListener实现,绑定队列,交换机,和route-key
        • 8.2接收消息:通过AmqpTemplate实现

SpringMQ的使用

1、windows安装

安装erlang,配置环境变量,安装rabbitmq-server

访问:http://localhost:15672/

2、添加依赖:

     <dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

3、增加rabbit的配置:

spring:rabbitmq:host: 127.0.0.1username: rootpassword: 880808virtual-host: /

4、最简单的测试:publisher—>MQ–>consumer

4.1、建立连接配置:

public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("127.0.0.1");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("12345678");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}}

4.2、发送端:建立连接,获取通道,创建队列,准备消息,发送消息到队列:

public class Send {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道,使用通道才能完成消息相关的操作Channel channel = connection.createChannel();// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 向指定的队列中发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//关闭通道和连接channel.close();connection.close();}
}

4.3 、接收端:建立连接,获取通道,声明队列,申请队列的一个消费者(内含监听消息的方法),在通道线路上接收消息。

public class Recv {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [x] received : " + msg + "!");}};// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);}
}

4.4 消息接收的手动确认:

        DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [x] received : " + msg + "!");// 手动进行ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列,第二个参数false,手动进行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);

5、work消息模型

5.1 消息发送:

public class Send {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i = 0; i < 50; i++) {// 消息内容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 2);}// 关闭通道和连接channel.close();connection.close();}
}

5.2 消息接收:

6、订阅模型分类:

1、1个生产者,多个消费者

2、每一个消费者都有自己的一个队列

3、生产者没有将消息直接发送到队列,而是发送到了交换机

4、每个队列都要绑定到交换机

5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的

X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

Exchange类型有以下几种:

​ Fanout:广播,将消息交给所有绑定到交换机的队列

​ Direct:定向,把消息交给符合指定routing key 的队列

​ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

我们这里先学习

​ Fanout:即广播模式

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

6.1 fanout:也就是交换机广播消息

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个消费者
  • 2) 每个消费者有自己的queue(队列)
  • 3) 每个队列都要绑定到Exchange(交换机)
  • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 5) 交换机把消息发送给绑定过的所有队列
  • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
6.1.1 发送消息:建立连接,获取通道,声明交换机,发送消息到交换机
public class Send {private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello everyone";// 发布消息到Exchangechannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [生产者] Sent '" + message + "'");channel.close();connection.close();}
}
6.1.2 接受消息:建立连接,获取通道,声明队列,绑定到交换机,定义消费者(包含监听),监听通道。
//消费者1
public class Recv {private final static String QUEUE_NAME = "fanout_exchange_queue_1";private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");}};// 监听队列,自动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}

6.2 direct:相当于定向投放

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

6.2.1 发送消息: 交换机类型指定为direct,发送消息时指定routing key

发送消息的RoutingKey分别是:insert、update、delete

public class Send {private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "商品新增了, id = 1001";// 发送消息,并且指定routing key 为:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());System.out.println(" [商品服务:] Sent '" + message + "'");channel.close();connection.close();}
}
6.2.2 接收消息: 获取通道,声明队列,绑定交换机,指定routing key
public class Recv {private final static String QUEUE_NAME = "direct_exchange_queue_1";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

6.3 Topic:交换机采用通配符的方式和队列匹配

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

6.3.1 发送消息: 指定交换机类型,发送消息携带route key
public class Send {private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为topicchannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息内容String message = "新增商品 : id = 1001";// 发送消息,并且指定routing key 为:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());System.out.println(" [商品服务:] Sent '" + message + "'");channel.close();connection.close();}
}
6.3.2 接收消息: 声明队列,绑定交换机携带带通配符的route key
public class Recv2 {private final static String QUEUE_NAME = "topic_exchange_queue_2";private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

7、持久化

7.1 交换机持久化

7.2 队列持久化

7.3 消息持久化

8、spring和MQ整合

8.1 设置接收消息的监听器: 通过注解@RabbitListener实现,绑定队列,交换机,和route-key

@Component
public class Listener {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spring.test.queue", durable = "true"),exchange = @Exchange(value = "spring.test.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"#.#"}))public void listen(String msg){System.out.println("接收到消息:" + msg);}}

8.2接收消息:通过AmqpTemplate实现

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo {@Autowiredprivate AmqpTemplate amqpTemplate;@Testpublic void testSend() throws InterruptedException {String msg = "hello, Spring boot amqp";this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);// 等待10秒后再结束Thread.sleep(10000);}
}

SpringMQ的使用相关推荐

  1. java 监听队列_spring+activemq实战之配置监听多队列实现不同队列消息消费

    摘选:https://my.oschina.net/u/3613230/blog/1457227 摘要: 最近在项目开发中,需要用到activemq,用的时候,发现在同一个项目中point-to-po ...

  2. spring整合使用activemq

    activemq简单的demo这里就不做演示了,今天介绍一下如何利用spring整合activemq,也是实际工作中涉及到的,希望对各位伙伴有所帮助, 1.安装activemq,为演示方便,我已经提前 ...

  3. ActiveMQ死信产生的原因及使用方案

    DLQ-死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息. 出现以下情况时,消息会被redelivered A transacted session is used and ...

  4. ActiveMQ的简单使用

    转:http://wosyingjun.iteye.com/blog/2314681 ActiveMQ的简单使用 ActiveMQ是一种开源的,实现了JMS规范的,面向消息(MOM)的中间件,为应用程 ...

  5. sheng的学习笔记-activeMQ框架原理

    目录 搭建环境 如果需要修改访问端口的话 如果需要修改用户名和密码的话 重启 ActiveMQ 配置文件 activemq.xml 基础理论: 什么是 JMS JMS基本开发步骤: 点对点模型(Poi ...

最新文章

  1. 聪明人的游戏(3)镜子颠倒了什么?
  2. python自学需要多久-自学Python多久能找到工作
  3. c语言程序构造数据类型问题,C语言程序设计课程课件之第四章简单构造数据类型.ppt...
  4. Visual Studio 2008 断点调试直接跳出代码窗口
  5. mysql8导出文件_windows下 Mysql 8.0.x 数据库简单的导出和导入!!!
  6. 原生js 基于canvas写一个简单的前端 截图工具
  7. 新冠隔离让你家Wifi变慢?全球网络大塞车AI缓拥堵,边缘计算或成杀手锏
  8. java word另存为_Java 网页html转为word并保存为doc文件
  9. mysql测试什么鬼,where 1=1 是什么鬼?SQL中有这玩意?
  10. APP 上传之后出现invalid binary 问题解决汇总
  11. 用組件封裝數據庫操作(一)
  12. 20155338《网络对抗》Web安全基础实践
  13. 银河麒麟系统查看网络设置命令_银河麒麟配置说明
  14. 今日头条信息流 - 基础账户实操
  15. 材料工程计算机技术应用,计算机在材料工程中的应用
  16. iOS 强制屏幕实现旋转功能
  17. 2021高考厦门科技中学成绩查询,2021年厦门重点高中名单及排名,厦门高中高考成绩排名榜...
  18. linux shell题库,shell习题-30
  19. 错误SyntaxError: (unicode error) truncated \UXXXXXXXX escape的解决方法
  20. ArtWork.Conversion.

热门文章

  1. 关于TP框架的微信开发服务器配置TOKEN验证失败解决方案
  2. 拷贝依赖_还不懂零拷贝(Zero-Copy)?怎么称得上高级程序员
  3. 安徽大学计算机教学平台c语言作业,安徽大学计算机基础C语言选择题
  4. sql a 表 若包含b表 则a 表 列显示_几道常见的SQL面试题,看你能答对几道?
  5. java课程总结_java课程总结报告.doc
  6. zabbix mysql监控告警_Zabbix监控mysql配置及故障告警配置
  7. ajax工具怎么安装,AJAX工具
  8. mongodb 查询效率_2020年9个好用的MongoDB 图形化界面工具
  9. mysql分页查询limit_MySQL查询语句(where,group by,having,order by,limit)
  10. Linux环境下配置Tomat