同步调用的优点:时效性较强,可以立即得到结果

同步调用的问题
1.耦合度高
2.性能和吞吐能力下降
3.有额外的资源消耗
4.有级联失败问题

异步通信的优点:
1.耦合度低
2.吞吐量提升
3.故障隔离
4.流量削峰

异步通信的缺点:
1.依赖于Broker的可靠性、安全性、吞吐能力
2.架构复杂了,业务没有明显的流程线,不好追踪管理

什么是AMQP
应用间消息通信的一种协议,与语言和平台无关。

SpringAMQP如何发送消息?

1.引入amqp的starter依赖
2.配置RabbitMQ地址
3.利用RabbitTemplate的convertAndSend方法

1.在父工程中引入spring-amqp的依赖

        <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

1.在publisher服务中编写application.yml,添加mq连接信息:

spring:rabbitmq:host: 192.168.64.131  #rabbitMQ的ip地址port: 5672  #端口username: itcastpassword: 123321virtual-host: / #虚拟主机

2.在publisher服务中新建一个测试类,编写测试方法:

@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue(){String queueName="simple.queue";//队列名称String message="hello,spring amqp";//消息rabbitTemplate.convertAndSend(queueName,message);}
}

成功发送消息

SpringAMQP如何接收消息?

1.引入amqp的starter依赖
2.配置RabbitMQ地址
3.定义类,添加@Component注解
类中声明方法,添加@RabbitListener注解,方法参数就时消息

注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

1.在consumer中编写消费逻辑,监听simple.queue

1.在consumer服务中编写application.yml,添加mq连接信息:

spring:rabbitmq:host: 192.168.64.131  #rabbitMQ的ip地址port: 5672  #端口username: itcastpassword: 123321virtual-host: /

2.在consumer服务中新建一个类,编写消费逻辑:

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者接收到simple.queue:"+msg);}
}

模拟WorkQueue,实现一个队列绑定多个消费者

1.在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
2.在consumer服务中定义两个消息监听者,都监听simple.queue队列
3.消费者1每秒处理50条消息,消费者2每秒处理10条消息

1.生产者循环发送消息到simple.queue

在publisher服务中添加一个测试方法,循环发送50条消息到simple.queue队列

    @Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName="simple.queue";//队列名称String message="hello,message--";//消息for (int i = 1; i < 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}

2.编写两个消费者,都监听simple.queue

在consumer服务中添加一个消费者,也监听simple.queue:

    @RabbitListener(queues = "simple.queue")public void listenSimpleQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:"+msg+ LocalTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenSimpleQueue2(String msg) throws InterruptedException {System.err.println("消费者2222接收到消息:"+msg+LocalTime.now());Thread.sleep(200);}

3.消费预取限制

修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限

spring:rabbitmq:host: 192.168.64.131  #rabbitMQ的ip地址port: 5672  #端口username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息

开始测试

发布( Publish )、订阅( Subscribe

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。exchange负责消息路由,而不是存储,路由失败则消息丢失

常见exchange类型包括:
Fanout:广播
Direct:路由
Topic:话题

发布订阅-Fanout Exchange
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue

发布订阅-DirectExchange
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

发布订阅-TopicExchange​​​​​​​
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词

1.利用SpringAMQP演示FanoutExchange的使用

1.实现思路如下:

1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定
2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
3.在publisher中编写测试方法,向itcast.fanout发送消息

2.在consumer服务声明Exchange、Queue、Binding

在consumer服务创建一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:

@Configuration
public class FanoutConfig {//声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//声明第1个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列1和交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//声明第2个队列@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}//绑定队列2和交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

3.在consumer服务声明两个消费者

在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2:

    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg){System.out.println("消费者接收到fanout.queue1:"+msg);}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg){System.out.println("消费者接收到fanout.queue2:"+msg);}

4.在publisher服务发送消息到FanoutExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

    @Testpublic void testSendFanoutExchange(){//交换机名称String exchangeName="itcast.fanout";//消息String message="hellow every one";//发送rabbitTemplate.convertAndSend(exchangeName,"",message);}

5.重启consumer测试

2.利用SpringAMQP演示DirectExchange的使用

实现思路如下:
1.利用@RabbitListener声明Exchange、Queue、RoutingKey
2.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
3.在publisher中编写测试方法,向itcast. direct发送消息

1.在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,
2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

    @RabbitListener(bindings =@QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listerDirecrtQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:"+msg);}@RabbitListener(bindings =@QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listerDirecrtQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:"+msg);}

2.在publisher服务发送消息到DirectExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

    @Testpublic void testSendDirectExchange(){//交换机名称String exchangeName="itcast.direct";//消息String message="hellow blue";//发送rabbitTemplate.convertAndSend(exchangeName,"blue",message);}

只有Routingkey含blue的队列才能接收到此消息

描述下Direct交换机与Fanout交换机的差异?

Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.利用SpringAMQP演示TopicExchange的使用

实现思路如下:
1.并利用@RabbitListener声明Exchange、Queue、RoutingKey
2.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
3.在publisher中编写测试方法,向itcast. topic发送消息

1.在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,
2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

    @RabbitListener(bindings =@QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void listerTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:"+msg);}@RabbitListener(bindings =@QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void listerTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:"+msg);}

2.在publisher服务发送消息到TopicExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

    @Testpublic void testSendTopicExchange(){//交换机名称String exchangeName="itcast.topic";//消息String message="你好我好大家好";//发送rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}

两个队列都能接收到消息

4.SpringAMQP-消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

1.测试发送Object类型消息

1.我们在consumer中利用@Bean声明一个队列:

    @Beanpublic Queue ObjectQueue(){return new Queue("object.queue");}

2.在publisher中发送消息以测试:

    @Testpublic void testSendObjectQueue() {Map<String,Object> msg=new HashMap<>();msg.put("name", "坏女人");msg.put("age", 18);rabbitTemplate.convertAndSend("object.queue",msg);}

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

2.如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

1.我们在父工程引入依赖

        <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

2.我们在publisher服务声明MessageConverter:

    @Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}

3.接收消息

1.我们在consumer服务定义MessageConverter:

    @Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}

2.然后定义一个消费者,监听object.queue队列并消费消息:

    @RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> msg){System.out.println("接收到object.queue的消息"+msg);}

重启服务

SpringAMQP发送与接收消息相关推荐

  1. 【译】 WebSocket 协议第六章——发送与接收消息(Sending and Receiving Data)

    概述 本文为 WebSocket 协议的第六章,本文翻译的主要内容为 WebSocket 消息发送与接收相关内容. 发送与接收消息(协议正文) 6.1 发送数据 为了通过 WebSocket 连接发送 ...

  2. springboot websocket发送和接收消息代码资源下载

    客户端发送给服务端的消息,同时接收服务端响应给客户端的消息: 服务端收到客户端发送过来的消息,并响应给客户端消息:

  3. ActiveMQ 发送和接收消息

    一.添加 jar 包 <dependency><groupId>org.apache.activemq</groupId><artifactId>act ...

  4. java mqtt客户端_java 实现mqtt发送和接收消息客户端具体用法及测试代码

    注:客户端代码实现请看我的上一篇 1mqtt发送消息 发送时不用多讲,每次发送肯定需要运行一次发送消息方法 MyMqttClient mqttClient = new MyMqttClient(); ...

  5. 通过kafka发送和接收消息

    生产者配置类: @Configuration @EnableKafka public class KafkaProducerConfig {@Value("${kafkaConfig.add ...

  6. PC微信逆向:发送与接收消息的分析与代码实现

    文章目录 定位微信的消息接收函数 定位消息接收函数的相关思路 定位消息内容的地址 分析接收消息函数 好友消息 群消息 总结 代码实现 定位微信的消息发送函数 定位消息发送函数的相关思路 过滤当前聊天窗 ...

  7. Netty:实现同步发送并接收消息的一种方式

    Netty创建通信服务时使用Nio异步通信, 配置代码(bootstrap.channel(NioSocketChannel.class);),要怎样实现这样一个同步发送消息并接收消息功能,虽然这样做 ...

  8. 游戏对象之间发送和接收消息

    1.创建一个游戏工程, 命名为SRMessageGo 2.在Project视图中创建3个文件夹, Scene文件夹.Resources文件夹和Script文件夹 3.将当前场景保存为GameScene ...

  9. metaq发送和接收消息demo

    为什么80%的码农都做不了架构师?>>>    一.maven依赖 <dependency><groupId>com.taobao.metamorphosis ...

最新文章

  1. 什么是面向对象(OOP)
  2. opencv轮廓及点在轮廓内判断
  3. 【云计算】5_云存储产品介绍
  4. VTK:图片之ImageGridSource
  5. 关于mysql数据库的备份和还原
  6. mysql群集配置_mysql8 参考手册-NDB群集配置参数,选项和变量概述
  7. 全球首款采用离心风扇/90Hz刷新率的电竞手机红魔3发布
  8. 人工智能将是人类最后的需要 | 大咖来了
  9. 一文超详细讲解文本风格迁移
  10. 服务器虚拟化可以节约成本吗?—硬件开支篇
  11. 20155213 实验三《敏捷开发与XP实践》实验报告
  12. 优先深度搜索判断曲线相交_深度优先搜索(Depth-first search)是如何搜索一张图的?...
  13. wifi抓包/苹果电脑mac book抓wifi sniffer packet
  14. spss软件测试题题库,spss期末考试试题及答案
  15. 麻将 java_怎么用java做麻将游戏
  16. Since Due to Because of Because 的用法和区别
  17. 全新的服务器debian/ubuntu---校准时间、更新apt,设置ssh远程访问
  18. 游戏原画教程:角色设计中的几个基本图形的用法
  19. 淘宝、拼多多、京东等购物平台的优惠券公众号免费搭建持续更新
  20. Tutorial 05: Synching Video

热门文章

  1. BJFU_数据结构习题_257统计字符出现的频度
  2. STM32F103C8T6控制电机驱动模块298N驱动电机调速以及正反转(附代码资源包)
  3. 【python】把某一列从文本转换成数值
  4. 股市基础知识、内盘和外盘与股票走势
  5. 【ESP系列】ESP8266-12F
  6. java解析网页全过程_Web页面的解析过程
  7. 多多情报通:拼多多修改关键词会降权吗?有什么影响?
  8. 拼多多商品详情页 API接口、拼多多商品SKU数据接口 API接口、拼多多关键词搜索接口 API接口 API接口、拼多多关键词采集 API接口、拼多多采集接口 API接口、拼多多详情 API接口
  9. Hard resetting via RTS pin...
  10. stm32 智能避障小车(二)之sg90