MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。

消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC的调用等等。

RabbitMQ介绍

RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

相关概念

通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。

  • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

  • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

四种交换机(Exchange)

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout

1. Direct Exchange

direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routingkey, 消息的routingkey 匹配时, 才会被交换器投送到绑定的队列中去.是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

配置:设置一个路由键

 public  static  final String QUEUE="queue";/*** direct 交换机模式*/@Beanpublic Queue queue(){return new Queue(QUEUE,true);}

发送服务:

@Service@Slf4jpublic class MQSender {@AutowiredAmqpTemplate amqpTemplate;public void send(Object message){String msg = (String) message;log.info("send msg"+message);amqpTemplate.convertAndSend(MQConfig.QUEUE,msg);}}

接收服务:

@Service@Slf4jpublic class MQReceiver {//监听的queue@RabbitListener(queues = MQConfig.QUEUE)public void receive(String msg){log.info("receive msg "+msg);}}

测试:

 @Autowiredprivate MQSender sender;sender.send("hello direct Exchange");

2. Topic Exchange

按规则转发消息(最灵活) 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

路由键必须是一串字符,用句号(.) 隔开,

路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词, 井号(#)就表示相当于一个或者多个单词

配置类:

 public  static  final String TOPIC_QUEUE1="topic.queue1";public  static  final String TOPIC_QUEUE2="topic.queue2";public  static  final String ROUTING_KEY1="topic.key1";public  static  final String ROUTING_KEY2="topic.#";/*** Topic 交换机模式  可以用通配符*/@Beanpublic Queue topicQueue1(){return new Queue(TOPIC_QUEUE1,true);}@Beanpublic Queue topicQueue2(){return new Queue(TOPIC_QUEUE2,true);}@Beanpublic TopicExchange topicExchange(){return new TopicExchange(TOPIC_EXCHANGE);}@Beanpublic Binding topicBinding1(){return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTING_KEY1);}@Beanpublic Binding topicBinding2(){return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTING_KEY2);}

发送类:

 public void sendTopic(Object message){String msg = (String) message;log.info("send topic message"+msg);amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key1",msg+"1");amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key2",msg+"2");}

接收类:

@RabbitListener(queues = MQConfig.TOPIC_QUEUE1)public void receiveTopic1(String msg){log.info("receive topic1 msg "+msg);}

测试:

@Autowiredprivate MQSender sender;sender.sendTopic("hello topic Exchange");

3. Headers Exchange

设置header attribute参数类型的交换机,相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

  public static final String HEADER_EXCHANGE="headerExchange";/*** Header 交换机模式*/@Beanpublic HeadersExchange headersExchange(){return new HeadersExchange(HEADER_EXCHANGE);}@Beanpublic Queue headerQueue(){return new Queue(HEADER_QUEUE2,true);}// 绑定需要指定header,如果不匹配 则不能使用@Beanpublic Binding headerBinding(){Map<String,Object> map = new HashMap();map.put("header1","value1");map.put("header2","value2");return BindingBuilder.bind(headerQueue()).to(headersExchange()).whereAll(map).match();}
 public void sendHeader(Object massage){String msg = (String) massage;log.info("send fanout message: "+msg);MessageProperties properties = new MessageProperties();properties.setHeader("header1","value1");properties.setHeader("header2","value2");Message obj = new Message(msg.getBytes(),properties);amqpTemplate.convertAndSend(MQConfig.HEADER_EXCHANGE,"",obj);}

用MessageProperties来添加Header信息,然后与接收者的header比对。我都设置的是"header1","value1";"header2","value2"

 //监听 header模式的queue@RabbitListener(queues = MQConfig.HEADER_QUEUE2)//因为发送的是 byte 类型,所以接受也是该数据类型public void receiveHeader(byte[] message){log.info("header queue message"+new String(message));}

测试:

 @Autowiredprivate MQSender sender;sender.sendHeader("hello header Exchange");

4. Fanout Exchange

转发消息到所有绑定队列,消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。

 public static final String FANOUT_EXCHANGE="fanoutExchange";/*** Fanout 交换机模式(广播模式),不用绑定key*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(FANOUT_EXCHANGE);}@Beanpublic Binding fanoutBinding1(){return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());}@Beanpublic Binding fanoutBinding2(){return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());}
 public void sendFanout(Object massage){String msg = (String) massage;log.info("send fanout message: "+msg);amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE,"",msg);}

测试:

 @Autowiredprivate MQSender sender;sender.sendFanout("hello fanout Exchange");

补充

这个示例是基于springboot的示例。

pom依赖

<!--rabbbitMQ相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

配置文件

 rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest# 这个账号密码只能连接本地的mq,远程的话需要配置virtual-host: /listener:simple:concurrency: 10max-concurrency: 10prefetch: 1 # 从队列每次取一个auto-startup: truedefault-requeue-rejected: true # 失败后重试

实现单机抢票系统

在这个项目里我用的是 springboot的2版本,ORM选用 JPA快速开发,JSON工具使用阿里的 fastjson,当然,mq用的是 rabbitMQ。导入的是 springboot集成的依赖。

1. 配置部分

1.1 pom.xml

  <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>RELEASE</version><scope>compile</scope></dependency></dependencies>

1.2 application.properties

server.port=10000spring.datasource.url=jdbc:mysql://xxxxx/xxxxx?characterEncoding=utf-8spring.datasource.username=xxxspring.datasource.password=xxxxspring.datasource.driver-class-name=com.mysql.jdbc.Driverspring.jpa.properties.hibernate.hbm2ddl.auto=updatespring.jpa.show-sql=truespring.rabbitmq.host=localhostspring.rabbitmq.username=rootspring.rabbitmq.password=rootspring.rabbitmq.port=5672

我只是很有针对性的对 mq和 datasource进行了配置。

1.3 数据表

create table if not result(id int auto_increment primary key,ticket_id int null,user_id int null);create table if not exists ticket(id int auto_increment primary key,name varchar(255) null,content varchar(255) null,user_name varchar(20) null,count int default '6666' not null);

根据数据表可以Generate出JavaBean,不贴JavaBean了。 ##### 1.4 项目架构

├── src│   ├── main│   │   ├── java│   │   │   └── com│   │   │       └── fantj│   │   │           └── springbootjpa│   │   │               ├── AMQP.java│   │   │               ├── controller│   │   │               │   └── TicketController.java│   │   │               ├── mq│   │   │               │   ├── Message.java│   │   │               │   ├── MQConstants.java│   │   │               │   ├── MQReceiver.java│   │   │               │   └── MQSender.java│   │   │               ├── pojo│   │   │               │   ├── Result.java│   │   │               │   └── Ticket.java│   │   │               ├── repostory│   │   │               │   ├── ResultRepository.java│   │   │               │   └── TicketRepository.java│   │   │               └── service│   │   │                   ├── ResultServiceImpl.java│   │   │                   ├── ResultService.java│   │   │                   ├── TicketServiceImpl.java│   │   │                   └── TicketService.java│   │   └── resources│   │       ├── application.properties│   │       └── rebel.xml

2. 启动类

@SpringBootApplication@EntityScan("com.fantj.springbootjpa.pojo")@EnableRabbitpublic class AMQP {public static void main(String[] args) {SpringApplication.run(AMQP.class, args);}}

注意这个 @EnableRabbit注解,它会开启对rabbit注解的支持。

3. controller

很简单的一个controller类,实现查询和抢票功能。

@RestController@RequestMapping("/ticket")public class TicketController {@Autowiredprivate TicketService ticketService;@Autowiredprivate MQSender mqSender;@RequestMapping("/get/{id}")public Ticket getByid(@PathVariable Integer id){return ticketService.findById(id);}@RequestMapping("/reduce/{id}/{userId}")public String reduceCount(@PathVariable Integer id,@PathVariable Integer userId){Message message = new Message(id,userId);ticketService.reduceCount(id);mqSender.sendMessage(new Message(message.getTicketId(),message.getUserId()));return "抢票成功!";}}

注意 privateMQSendermqSender;这是我的 rabbit发送消息的类。

4. Service

接口我就不再这里贴出,直接贴实现类。

4.1 ResultServiceImpl.java

@Servicepublic class ResultServiceImpl implements ResultService{@Autowiredprivate ResultRepository resultRepository;@Overridepublic void add(Result result) {resultRepository.add(result.getTicketId(), result.getUserId());}@Overridepublic Result findOneByUserId(Integer userId) {return resultRepository.findByUserId(userId);}}

4.2 TicketServiceImpl.java

@Servicepublic class TicketServiceImpl implements TicketService{@Autowiredprivate TicketRepository repository;@Overridepublic Ticket findById(Integer id) {return repository.findTicketById(id);}@Overridepublic Ticket reduceCount(Integer id) {repository.reduceCount(id);return findById(id);}}

这两个都是很普通的service实现类,没有新加入的东西。

5. Dao

5.1 ResultRepository.java

@Repositorypublic interface ResultRepository extends JpaRepository<Result,Integer> {@Transactional@Modifying@Query(value = "insert into result(ticket_id,user_id) values(?1,?2) ",nativeQuery = true)void add(@Param("ticketId") Integer ticketId,@Param("userId") Integer userId);Result findByUserId(Integer userId);}5.2 TicketRepository.java@Repositorypublic interface TicketRepository extends JpaRepository<Ticket,Integer>{/***  减少库存*/@Transactional@Modifying@Query(value = "update ticket t set t.count=t.count+(-1) where id=?1",nativeQuery = true)int reduceCount(Integer id);/*** 查询信息*/Ticket findTicketById(Integer id);}

到了这里,你会发现,md哪里有用mq的痕迹...

6. MQ

剩下的全是mq的处理。

6.1 Message.java

这个类用来封装mq传输的消息对象,我们使用它来对传输的byte进行编解码,得到我们想要的数据。

@Datapublic class Message implements Serializable {private Integer ticketId;private Integer userId;public Message() {}public Message(Integer ticketId, Integer userId) {this.ticketId = ticketId;this.userId = userId;}}

6.2 MQConstants.java

这是一个常量类,用来定义和保存 queue的名字,虽然里面只有一个常量,好习惯要从小事做起。

public class MQConstants {public static final String QUEUE= "qiangpiao";}

6.3 MQSender.java

这是消息发送类,用来给queue发送数据。

@Service@Slf4jpublic class MQSender {@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(Message message){String msg = JSONObject.toJSONString(message);log.info("send message : "+msg);amqpTemplate.convertAndSend(MQConstants.QUEUE,msg);}}

AmqpTemplate是springboot框架提供给我们使用的amqp操作模板,利用它我们能更方便的调用和处理业务。 我们在Controller层调用它,来完成消息入队的操作,完成削峰和异步处理,大大增加了系统并发和强健性。

6.4 MQReceiver.java

这是消息接收类,用来从queue里获取数据。

@Service@Slf4jpublic class MQReceiver {@Autowiredprivate TicketService ticketService;@Autowiredprivate ResultService resultService;@RabbitListener(queues = MQConstants.QUEUE)public void receive(String message){log.info("receive msg : "+message);JSONObject jsonObject = JSONObject.parseObject(message);System.out.println(jsonObject);Message msg = JSONObject.toJavaObject(jsonObject, Message.class);Integer ticketId = msg.getTicketId();Integer userId = msg.getUserId();// 减库存Ticket ticket = ticketService.reduceCount(ticketId);if (ticket.getCount() <= 0){return;}// 判断是否已经抢过Result oneByUserId = resultService.findOneByUserId(userId);if (oneByUserId != null){return;}resultService.add(new Result(ticketId,userId));}}

在这个类中, @RabbitListener(queues=MQConstants.QUEUE)标记的是监听方法,该方法会从queue中获取到String数据。

之后我们需要将其复原为JavaBean,取出我们该要的属性,继续处理业务: 查询票剩余量-> 判断是否已抢到过-> 减库存 -> 增加抢票数据。 (我这里写的有点草率,应该先查余量...,不过不重要,本章重点在过一遍springboot与rabbitmq的整合)。

运行效果

我对该抢票功能做了一个9999请求,我本来做3k并发,电脑没那么多句柄,实现不了,最后做了1k并发的压测。

这是rabbitMQ 自带Managerment模板上的截图:

压测报告:

Server Software:        Server Hostname:        127.0.0.1Server Port:            10000Document Path:          /ticket/reduce/1/10Document Length:        13 bytesConcurrency Level:      1000Time taken for tests:   423.101 secondsComplete requests:      9999Failed requests:        0Total transferred:      1459854 bytesHTML transferred:       129987 bytesRequests per second:    23.63 [#/sec] (mean)Time per request:       42314.334 [ms] (mean)Time per request:       42.314 [ms] (mean, across all concurrent requests)Transfer rate:          3.37 [Kbytes/sec] receivedConnection Times (ms)min  mean[+/-sd] median   maxConnect:        0    2   6.8      0      29Processing:   217 40197 7390.7  41984   58488Waiting:      217 40197 7390.8  41984   58488Total:        246 40199 7384.8  41985   58488Percentage of the requests served within a certain time (ms)50%  4198466%  4267075%  4274480%  4275890%  4280195%  4282898%  4285099%  42868100%  58488 (longest request)

注意

  1. 本项目没有考虑线程安全的问题,事实上线程是不安全的,线程安全问题后面会说。

  2. 本项目只是为了mq的削峰和异步处理,最直观的就是数据库可以称住高并发,一般来讲,数据库连接这块是称不住的。

  3. mq在分布式下的问题后面会说。

SpringBoot整合RabbitMQ,实现单机抢票系统相关推荐

  1. SpringBoot实现12306自动抢票系统

    写在前面 前段时间在浏览开源社区的时候,不小心看到一个 12306抢票 系统,一下就被吸引住了,然后就动力歪念头?,过年终于不用找黄牛了,哇哈哈哈,写了差不多一个星期,终于可以全自动抢票了,中间遇到的 ...

  2. 九、springboot整合rabbitMQ

    springboot整合rabbitMQ 简介 rabbitMQ是部署最广泛的开源消息代理. rabbitMQ轻量级,易于在内部和云中部署. 它支持多种消息传递协议. RabbitMQ可以部署在分布式 ...

  3. RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ

    什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...

  4. Springboot整合RabbitMQ,包含direct,topic,fanout三种模式的整合

    一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿 ...

  5. 12306抢票系统的NB解析

    每到节假日期间,一二线城市返乡.外出游玩的人们几乎都面临着一个问题:抢火车票! 虽然现在大多数情况下都能订到票,但是放票瞬间即无票的场景,相信大家都深有体会.尤其是春节期间,大家不仅使用12306,还 ...

  6. 腻害,高人都是这样玩SpringBoot整合RabbitMQ

    一.认识 RabbitMQ RabbitMQ 简介以 AMQP 协议: (1)RabbitMQ 是开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ 底层是用了 ...

  7. 12306抢票系统详解

    12306 抢票,极限并发带来的思考: 虽然现在大多数情况下都能订到票,但是放票瞬间即无票的场景,相信大家都深有体会. 尤其是春节期间,大家不仅使用 12306,还会考虑"智行"和 ...

  8. SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka

    1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...

  9. SpringBoot整合Dubbo+Zookeeper进行分布式搭建系统

    QUESTIONl:SpringBoot整合Dubbo+Zookeeper进行分布式搭建系统? ANSWER: 一:创建项目模块 1.1.创建一个Empty Project 名称:Dubbo 1.2. ...

最新文章

  1. 2021年大数据Flink(四十八):扩展阅读  Streaming File Sink
  2. 如何重装计算机操作系统,自己如何重装笔记本电脑操作系统呢?
  3. 在Html中使用Requirejs进行模块化开发
  4. 深度学习基础知识(一): 概述-神经网络起源和发展
  5. 网络:HTTP1.1和HTTP2区别
  6. Eclipse中实现SpringBoot与Mybatis整合(图文教程带源码)
  7. DataX在有赞大数据平台的实践
  8. 服务器升级中不能修改,windows10下更新服务器为何改不了了
  9. 快速查看Gradle项目的类库依赖情况
  10. 交叉熵【度量两个概率分布间的差异性信息】
  11. phpcmsV9 表单向导(案例一)应用示例
  12. 2014年第一季度总结报告
  13. rpm数据库异常问题总结
  14. 让“施工进度计划”真正产生价值——不是为了做计划而计划
  15. 大地测量学基础(复习)第一部分
  16. 可编程逻辑控制器类毕业论文文献都有哪些?
  17. 数据安全管理条例明确个人信息保护 360呼吁隐私保护重在企业
  18. PDF虚拟打印机有什么用?关于PDF虚拟打印机你要知道的事情都在这
  19. 信不信由你,反正我是信了!接龙啊。。。。。
  20. lae界面开发工具入门之介绍二--渲染组件篇

热门文章

  1. 安全测试工作规范文档,希望能帮到大家
  2. Linux setenforce命令详解[SeLinux操作]
  3. PHP基于thinkphp的网上书店管理系统#毕业设计
  4. java一个包有两个类_java一个包包含多个类 java 类包含类
  5. Linux系统配置(防火墙)
  6. 哲学家就餐问题(条件变量)
  7. docker启动容器指定端口和随机分配端口
  8. Vue使用html2Canvas导出pdf报Uncaught (in promise) Error: Element is not attached to a Document错误
  9. Microsoft Technical Fellows
  10. 【视频】视频播放(包含视频录制)的测试点总结