RabbitMQ的基本概念以及绑定策略的简单演示
首先先来了解一下两个名词
JMS
Java Message Service,Java定义的一套消息服务标准,符合JMS标准规范的,都是通用的Java消息服务
MOM
Message Oriented Middleware,面向消息的中间件符合消息开发标准规范的中间件产品,例如ActliveMQ、RabbitMQ、Kafka等。可以提供消息存储机制、提供消息的发送和消费服务,提供消息的缓存处理等功能的中间件产品。符合MOM规范的产品,同时可以依托JMS标准规范访问的产品,可以称为JMS Provider
MQ的好处
解耦
在多个组件之间原本进行网络调用的方式现在换成MQ的方式来进行消息的异步通讯,倘若一个消费端系统下线,影响也仅仅是消息积压在MQ中没有被消费而已
可恢复性
异步通信
消息队列提供了异步处理机制,能够很好的提升用户的体验度(订单系统为例,用户下订单后,订单系统直接返回下订单成功的结果,然后将数据封装到MQ中,最后进入数据库)
峰值处理
MQ能够使关键组件在访问高峰时顶住巨大的压力(订单系统为例,订单会被存储到MQ队列中,不会直接访问消费端,消费端通过拉取的方式控制处理速度,使流量趋于平稳,达到了削峰填谷的目的)
扩展性
由于组件之间的耦合度很低,所以增大消息入队和处理频率是很方便的
RabbitMQ的原理
Broker
接受客户端连接,实现AMQP消息队列和路由功能的进程(就是我们启动的RabbitServer)
Vhost
虚拟主机,一个VH里面可以有多个Exchange和Queue。当多个不同用户使用同一个RabbitMQ服务(Broker)时,会划分出多个VH
Exchange
接受发送方的消息,根据Binding规则将消息路由到不同的Queue中
Queue
Channel
由于大量建立TCP连接不现实,所以AMQP(高级消息队列协议)规定:消息都必须经过信道发送出去
绑定策略
Direct
是一种点对点的交换器,发送方发送消息到MQ中,MQ的direct交换器接收到消息后,会根据Routiong Key来决定消息将会被发送到哪一个队列中;而接收方则需要负责监视一个队列(通过注册队列监听器),当队列状态发生变化时消费消息。注册队列监听器需要提供交换器信息、队列信息和路由键信息
发送方
yaml
server:port: 8282spring:rabbitmq:host: 192.168.49.142port: 5672username: guestpassword: guestvirtual-host: /
pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
direct
@Component public class DirectMessagePush {/*** 注入RabbitMQ逻辑模板*/@Resourceprivate AmqpTemplate amqpTemplate;public void sendMessage(Order order) {/*** order-exchange 具体的交换器* order 路由键*/this.amqpTemplate.convertAndSend("order-exchange", "order", order);}}
controller
@Controller public class Send {@Resourceprivate DirectMessagePush directMessagePush;@GetMapping("/order")@ResponseBodypublic String sendOrder() {Order order = new Order();order.setId(1);order.setTotalPrice(18000.00);List<Item> items = new ArrayList<>();for (int i = 0; i < 5; i++) {Item item = new Item();item.setId(1 + i);item.setPrice(2000.00 + i * 200);item.setProductName("Apple 16");item.setRemark("2019 16寸");items.add(item);}order.setItems(items);this.directMessagePush.sendMessage(order);return "SUCCESS !";}}
接收方
yaml
server:port: 8181spring:rabbitmq:host: 192.168.49.142port: 5672username: guestpassword: guestvirtual-host: /listener:direct:retry:enabled: truemax-attempts: 10
pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- AMQP插件组,用于开发spring boot访问符合AMQP协议的MQ产品的依赖启动器 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
consumer
/*** @RabbitListener RabbitMQ的监听器类* bindings 绑定策略* @QueueBinding 声明具体的绑定策略* value 具体绑定的队列* exchange 队列对应的交换器* key 绑定的具体路由键* @Queue 具体的队列描述* name 队列名称,消费者关注,发布者不关注* autoDelete* “true” 当队列没有被任何消费者监听的时候,RabbitMQ自动删除该队列* “false” 只要队列创建,永不删除,RabbitMQ保存未被消费的消息等待其它消费监听者处理* @Exchange 具体的交换器* name 交换器名称* autoDelete 当没有队列与交换器绑定时,是否删除该交换器* type 交换器的类型*/ @RabbitListener(bindings = {@QueueBinding(value = @Queue(name = "order-queue", autoDelete = "false"),exchange = @Exchange(name = "order-exchange", type = "direct", autoDelete = "false"),key = "order") }) @Component public class OrderMassage {/*** @param order 消息* @RabbitHandler 标记当前方法是消费消息的方法* 该方法将会被会注册到MQ上,监听MQ的队列,当队列中出现消息的时候自动消费* <p>* 消费端方法不能有返回值!*/@RabbitHandlerpublic void doSomething(Order order) {/*** 相关业务*/System.out.println("order = ------------------>>> " + order);}}
Fanout
广播交换器。将接收到的消息广播发送到绑定匹配的所有队列中,这个过程交换器不会匹配Routing Key,所以消息中不需要提供路由键信息;接收方则需要负责监视一个队列(通过注册队列监听器),当队列状态发生变化时消费消息。注册队列监听器需要提供交换器信息、队列信息
发送方
yaml
server:port: 8282spring:rabbitmq:host: 192.168.49.142port: 5672username: guestpassword: guestvirtual-host: /
pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
fanout
@Component public class FanoutPublisher {@Resourceprivate AmqpTemplate amqpTemplate;public void sendStr(String str) {// 中间占位参数不能缺省!this.amqpTemplate.convertAndSend("fanout-exchange", "", str);}}
controller
@Controller public class Send {@Resourceprivate FanoutPublisher fanoutPublisher;@GetMapping("/fanout")@ResponseBodypublic String sendStr() {this.fanoutPublisher.sendStr("广播通知:... ...");return "OK";}}
接收方
yaml
server:port: 8181spring:rabbitmq:host: 192.168.49.142port: 5672username: guestpassword: guestvirtual-host: /listener:direct:retry:enabled: truemax-attempts: 10
pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- AMQP插件组,用于开发spring boot访问符合AMQP协议的MQ产品的依赖启动器 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
consumer1
/*** 广播队列的消费者1*/ @RabbitListener(bindings = {@QueueBinding(value = @Queue(name = "log-fanout1", autoDelete = "false"),exchange = @Exchange(name = "fanout-exchange", autoDelete = "false", type = "fanout")) }) @Component public class Person1 {@RabbitHandlerpublic void fanoutHandler(String srt) {System.out.println("srt1 ----------------------------> " + srt);}}
consumer2
/*** 广播队列的消费者2*/ @RabbitListener(bindings = {@QueueBinding(value = @Queue(name = "log-fanout2", autoDelete = "false"),exchange = @Exchange(name = "fanout-exchange", autoDelete = "false", type = "fanout")) }) @Component public class Person2 {@RabbitHandlerpublic void fanoutHandler(String srt) {System.out.println("srt2 ----------------------------> " + srt);}}
Topic
主题交换器。也称之为规则匹配交换器。通过自定义的匹配规则来决定消息存储到哪些队列中,MQ中的交换器会根据Routing Key来决定消息应该发送到某具体队列;接收方则需要负责监视一个队列(通过注册队列监听器),当队列状态发生变化时消费消息。注册队列监听器需要提供交换器信息、队列信息和路由键信息
发送方
yaml
server:port: 8282spring:rabbitmq:host: 192.168.49.142port: 5672username: guestpassword: guestvirtual-host: /
pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
topic
/*** 发送日志消息*/ @Component public class LogPublisher {@Resourceprivate AmqpTemplate amqpTemplate;public void sendLog(String log) {Random random = new Random();int num = random.nextInt(10000);String routingKey = "";if (num % 5 == 0) {routingKey = "char.log.info";}if (num % 5 == 1) {routingKey = "char.log.warn";}if (num % 5 == 2) {routingKey = "char.log.error";} else {routingKey = "char.log.char";}System.out.println("routingKey --------------------> " + routingKey);this.amqpTemplate.convertAndSend("topic-exchange", routingKey, log);}}
接收方
yaml
server:port: 8181spring:rabbitmq:host: 192.168.49.142port: 5672username: guestpassword: guestvirtual-host: /listener:direct:retry:enabled: truemax-attempts: 10
pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- AMQP插件组,用于开发spring boot访问符合AMQP协议的MQ产品的依赖启动器 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
consumer1
/*** 消费所有级日志*/ @RabbitListener(bindings = {@QueueBinding(value = @Queue(name = "topic-log-all", autoDelete = "false"),exchange = @Exchange(name = "topic-exchange", autoDelete = "false", type = "topic"),key = "*.log.*") }) @Component public class All {@RabbitHandlerpublic void logHandler(String info) {System.out.println("all --------------------------->" + info);}}
consumer2
/*** 消费error级日志*/ @RabbitListener(bindings = {@QueueBinding(value = @Queue(name = "topic-log-error", autoDelete = "false"),exchange = @Exchange(name = "topic-exchange", autoDelete = "false", type = "topic"),key = "*.log.error") }) @Component public class Error {@RabbitHandlerpublic void logHandler(String error) {System.out.println("error --------------------------->" + error);}}
consumer3
…
consumer4
…
注意事项:开发消息消费端是时候,消费方法不能带有返回值(也就是说@RabbitHandler描述的方法不具备返回值)
RabbitMQ的基本概念以及绑定策略的简单演示相关推荐
- 通过实例理解 RabbitMQ 的基本概念
先说下自己开发的实例. 最近在使用 Spring Cloud Config 做分布式配置中心(基于 SVN/Git),当所有服务启动后,SVN/Git 中的配置文件更改后,客户端服务读取的还是旧的配置 ...
- AngularJS学习笔记之directive—scope选项与绑定策略
From:http://www.linuxidc.com/Linux/2015-05/116924.htm scope:{}使指令与外界隔离开来,使其模板(template)处于non-inherit ...
- c++switch实现猜拳_策略模式+简单工厂+注解消除 if-else/switch-case
消除代码中的 if-else/switch-case 在很多时候,我们代码中会有很多分支,而且分支下面的代码又有一些复杂的逻辑,相信很多人都喜欢用 if-else/switch-case 去实现.做的 ...
- 程序化交易系统主观辅助交易策略编写和演示 及文华tb单个品种指数合成方法
程序化交易系统主观辅助交易策略编写和演示 def tick_zhishu(ls):"""从数据库中读取合约后,合成约指数最新价,用于合成指数合约k线,采用持仓量加权:pa ...
- 外汇交易市场策略:简单最常用的四周规则与七种交易策略
简单常用四周规则 随着越来越复杂.越来越富于想象力的外汇交易系统和外汇指标的出现,外汇投资者们往往忽视了那些简单.基本的工具,而它们的效果相当好,经受住了时间的考验.今天,为大家分享其中一种最简便的外 ...
- 跨服务同步数据(MYSQL),@Scheduled定时任务,HttpClient分批发送数据,JSONobject,策略模式+简单工厂,异步@Async+CompletableFuture使用
目录 1.实现远程post请求 下面首先创建HttpClient用来实现远程post请求 2.发送数据 接下来就是将获取的数据(SyncDataParam )通过http请求方式发送给另一个服务(ap ...
- 策略与简单工厂模式结合的实现--收银软件的代码及UML图
策略模式和简单工厂模式的结合:把分支判断放到环境角色中. 解决简单工厂模式中提到的问题: ●关键:分支的switch依然去不掉 ●解决方法:依赖注入.反射.XML 简单工厂模式 策略模式 收银软件的策 ...
- 策略模式+简单工厂之旅游出行策略与门票折扣案例
文章目录 策略模式+简单工厂 (1)旅游出行策略(飞机,高铁,大巴,骑行,徒步至少两种出行方式) (2)门票折扣 策略模式+简单工厂 (1)旅游出行策略(飞机,高铁,大巴,骑行,徒步至少两种出行方式) ...
- 量化投资-基本面组合-10大基本面策略的简单组合
策略: 1.从上面介绍的10几个策略中,选出来10个策略,每个品种使用50万资金,每次交易一手 2.手续费万一,3元或者10元每手,滑点各一跳 3.从2008年1月1日或者开始上市日期最近的开始回测 ...
最新文章
- 超级全面的 SpringBoot 注解介绍,每一个用途都应该清晰
- 分类器可视化解释StylEx:谷歌、MIT等找到了影响图像分类的关键属性
- Nginx防盗链详细设置
- jsp页面,在浏览器端显示时会出现乱码解决方法
- 社区运营破冰也有三大原则八项注意“了,你造吗?
- Linux下的第一个驱动程序
- linux c 数据库访问框架,linux c 开发通用结构,框架
- mysq由于主键冲突导致主从不同步
- 交通仿真软件测试自学,[2018年最新整理]各类交通仿真软件综合介绍.docx
- Hadoop HIVE 条件控制函数
- 教你win10系统无法识别语音识别的解决方法
- Read Asia Embedded fell
- 36. PHP面向对象
- .NET程序员应掌握的常用类库
- 服务器主板支持nvme,给老主板刷上一个加入支持NVMe模块的改版“BIOS”
- nginx 解决 405 not allowed错误
- MySQL常见错误:Starting MySQL...The server quit without updating PID file (/usr/local/mysql/data/localhos
- Python直角坐标系画图
- 软件定义的网络(中)
- qbittorrent 等待_qBittorrent下载BT电影教程