一、消息队列概述

1、什么是消息队列?

消息队列是消息中间件的一种实现方式。消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

目前的消息队列有:ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。本篇主要对activeMQ集成到SpringBoot项目做详解。

消息队列在实际应用中常用的使用场景有异步处理,应用解耦,流量削锋和消息通讯四个场景。

  • 异步处理
    传统的发消息接收消息做法有两种1.串行的方式;2.并行方式。
    下面会以Demo的形式呈现。

  • 应用解耦
    在spring cloud分布式微服务项目中,工单管理和设备管理分别是两个微服务,如果A工单被张三接单了,那么工单状态要设为已派单,检验员设为张三,设备状态要置为在检。
    传统的做法是,先调用工单管理的工单更新接口,成功之后再调用设备管理的设备更新接口,成功之后再返回操作提示给用户。这样做的缺点是应用耦合,如果在派单操作的时候正好设备管理微服务挂了或者阻塞了,那么派单操作就会失败或者要等待很长时间无反馈。另外如果设备管理的接口有变动,那么工单管理里面的代码也要改动。

    引入消息中间件,派单的时候,工单管理的工单更新接口处理好后把信息写入消息队列,然后直接返回操作反馈给用户。不管工单管理服务正不正常,正常就从消息队列里订阅消息处理,不正常就等待回复正常后再订阅消息处理。

  • 流量削峰
    XX公司的系统原来是针对A地区的客户开发的,现在为了抢占市场,拿下了B和C两个地区的客户,那么新系统上线,就存在如何把B和C的基础数据导入XX公司的系统中来的问题,短时间内要把庞大的旧数据改造适合新系统再导入进来,这很容易使系统挂掉,另外每天还有增量数据产生。
    这时可以引入消息中间件,B和C的客户只要负责把数据规则放到消息队列里就好了,XX公司可以有条不紊的从消息队列里订阅数据,可以有效缓解短时间内的高流量压力,但是这也对消息中间件的可靠性提出了要求。

  • 点对点通讯
    类似聊天室的功能。

2、消息队列传递模型形式

  • 点对点模式(PTP)

点对点模型的特点:

1,每个消息只有一个消费者

2,发送者和接受者没有时间依赖

3,接受者确认消息接受和处理成功

  • 发布-订阅(Pub/Sub)

即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

发布-订阅模型特点:

1,每个次消息可以有多个消费者

2,客户只有订阅后才能接收消息(只有建立订阅关系才可以接收消息 )

3,持久订阅和非持久订阅

  • 持久订阅
    订阅关系建立后(关系保存在消息中间件中),不管消费者(也就是订阅者)是否在线消息都不会消失。
  • 非持久订阅
    建立一种类似长连接关系式的订阅模式,订阅者为了接收消息必须保持一直连接的状态,如果断开连接则丢失消息。

二、项目集成ActiveMQ

1、在pom文件中加入依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

2、如果使用pool的话, 就需要在pom中加入以下依赖:

<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.16.3</version>
</dependency>

3、yml配置文件加入:

server:port: 8080context-path: /pro
spring:activemq:user: adminpassword: xxxxx# 此处注意端口号不是之前的8161。broker-url: tcp://127.0.0.1:61616# 开启连接池(默认不开启)。pool:enabled: truemax-connections: 10queueName: publish.queue
topicName: publish.topic

下面是activeMQ可以赋值属性:

spring.activemq.broker-url=tcp://127.0.0.1:61616
# 在考虑结束之前等待的时间
#spring.activemq.close-timeout=15s
# 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。
spring.activemq.in-memory=true
# 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
spring.activemq.non-blocking-redelivery=false
# 密码
spring.activemq.password=xxxx
# 等待消息发送响应的时间。设置为0等待永远。
spring.activemq.send-timeout=0
spring.activemq.user=admin
# 是否信任所有包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗号分隔列表(当不信任所有包时)
#spring.activemq.packages.trusted=
# 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”。
#spring.activemq.pool.block-if-full=true
# 如果池仍然满,则在抛出异常前阻塞时间。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在启动时创建连接。可以在启动时用于加热池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 连接过期超时。
#spring.activemq.pool.expiry-timeout=0ms
# 连接空闲超时
#spring.activemq.pool.idle-timeout=30s
# 连接池最大连接数
#spring.activemq.pool.max-connections=1
# 每个连接的有效会话的最大数目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 当有"JMSException"时尝试重新连接
#spring.activemq.pool.reconnect-on-exception=true
# 在空闲连接清除线程之间运行的时间。当为负数时,没有空闲连接驱逐线程运行。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否只使用一个MessageProducer
#spring.activemq.pool.use-anonymous-producers=true

4、Demo

①简单测试Demo

消息生产者:

import java.util.Map;import javax.jms.Destination;
import javax.jms.Queue;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessagePostProcessor;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;/*** 消息Producer* @author ray**/
@Component
//@EnableScheduling
public class AlarmProducer {// 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装@Autowiredprivate JmsTemplate jmsTemplate;
//    private JmsMessagingTemplate jmsTemplate;//    @Autowired
//    private Queue queue;//    @Scheduled(fixedDelay=5000) // 5s执行一次   只有无参的方法才能用该注解public void sendMessage(Destination destination, String message){//        jmsTemplate.convertAndSend(destinationName, payload, messagePostProcessor);this.jmsTemplate.convertAndSend(destination, message);}  // 双向队列// @JmsListener(destination="out.queue")     //   public void consumerMessage(String text){          //   System.out.println("从out.queue队列收到的回复报文为:"+text);      // }
}

②消息消费者:

import org.apache.commons.lang.StringUtils;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;/*** 消息Consumer* @author ray**/
@Component
public class AlarmConsumer {// 使用JmsListener配置消费者监听的队列,其中text是接收到的消息  @JmsListener(destination = "mytest.queue")     // @SendTo("out.queue") 为了实现双向队列public void receiveQueue(String text) {  if(StringUtils.isNotBlank(text)){System.out.println("AlarmConsumer收到的报文为:"+text);  System.out.println("谢谢["+text+"]的浏览");System.out.println("谢谢["+text+"]的浏览");System.out.println("");}}
}

controller里写上测试接口

@Autowiredprivate AlarmProducer alarmProducer;@RequestMapping(value="/chufaxiaoxi",method=RequestMethod.GET)@ApiOperation(value="触发消息", notes="触发消息")@ApiImplicitParams({@ApiImplicitParam(name = "devicename", value = "name",example = "xxxx", required = true, dataType = "string",paramType="query"),})public String chufabaojing(String devicename){List<String> alarmStrList = new ArrayList<>();alarmStrList.add(devicename+"out fence01");alarmStrList.add(devicename+"out fence02");alarmStrList.add(devicename+"in fence01");alarmStrList.add(devicename+"in fence02");System.out.println("先生/女士"+devicename+"正在浏览");// 信息写入数据库System.out.println("浏览人员记录到数据库。。。");// 写入消息队列Destination destination = new ActiveMQQueue("mytest.queue");for (String alarmStr : alarmStrList) {alarmProducer.sendMessage(destination, alarmStr);}// 消息写进消息队列里就不管了// 下面两步骤移到activemq消费者里// 发送邮件// 发送短信return "success";}

有其他ActiveMQ问题可参考:https://blog.csdn.net/shuangzh115/article/details/50989182

SpringBoot集成ActiveMQ相关推荐

  1. SpringBoot集成ActiveMq消息队列实现即时和延迟处理

    原文链接:https://blog.csdn.net/My_harbor/article/details/81328727 一.安装ActiveMq 具体安装步骤:自己谷歌去 二.新建springbo ...

  2. activemq 开启监听_SpringBoot集成ActiveMQ怎么实现Topic发布/订阅模式通信?

    上一期我们讲了SpringBoot集成activeMQ实现Queue模式点对点通信,这一期我们接着讲SpringBoot集成activeMQ实现Topic发布/订阅模式通信. 发布/订阅模式通信是对点 ...

  3. Windows下安装Mongodb SpringBoot集成MongoDB和Redis多数据源

    全文内容: Mongodb安装 说明:Mongodb和redis是开发中常用的中间件,Redis的安装使用比较简单就不写了,只说本地也就是Windows安装Mongodb. SpringBoot集成M ...

  4. SpringBoot集成FreeMarker

    给大家简单介绍一下springboot 集成FreeMarker 过程很简单,5分钟即可. 首先在项目中增添依赖spring-boot-starter-freemarker pom文件代码如下: &l ...

  5. springboot集成swagger2测试接口

    springboot集成swagger2测试接口 1.需要的依赖 2.开始编写一个swagger2 3.演示效果图片 1.需要的依赖 <dependency><groupId> ...

  6. springboot 集成logback

    springboot 集成logback 1.application.properties配置文件指定logback.xml logging.config=classpath:logback.xml ...

  7. springboot集成swagger2多模块中文配置详细步骤,解决集成mybatis或mybatis-plus无法正常使用问题

    springboot集成swagger2多模块中文配置详细步骤,解决集成mybatis或mybatis-plus无法正常使用问题 参考文章: (1)springboot集成swagger2多模块中文配 ...

  8. SpringBoot集成Quartz(解决@Autowired空指针Null问题即依赖注入的属性为null)

    SpringBoot集成Quartz(解决@Autowired空指针Null问题即依赖注入的属性为null) 参考文章: (1)SpringBoot集成Quartz(解决@Autowired空指针Nu ...

  9. SpringBoot集成全局异常处理

    2019独角兽企业重金招聘Python工程师标准>>> SpringBoot集成全局异常处理 前言 对于通常的MVC项目,大量的异常需要我们去处理,如此一来,我们的 Controll ...

最新文章

  1. 利用ViewPager+Fragment+actionbar实现可左右滑动的Action Tab
  2. 面试时,当你有权提问时,别客气,这是个逆转的好机会(内容摘自Java Web轻量级开发面试教程)...
  3. c-nominated = ((rcheck-use_candidate) || c-nominated);
  4. Binary Tree Level Order Traversal II 解题思路
  5. R开发(part12)--基于RC的面向对象编程
  6. 利用FindWindow和SendMessage进程通信
  7. MAMP升级mysql5.6到5.7
  8. Android Button监听的方式
  9. 10年软件开发教会我最重要的10件事[转]
  10. 手游方舟怎么输入代码_明日方舟再次登顶失败,为了不发十连奖励,鹰角实力控分?...
  11. 警惕“***性社工”现象
  12. 用argparse解析布尔值
  13. java中判断字符串是否为纯数字
  14. 0x0000006B蓝屏解决方法
  15. 用切片实现一个简陋的Map
  16. c语言中如何用sqar函数,简易函数信号发生器设计_毕业论文.doc
  17. 科研成果 | 信道模型 | 原理及随机数仿真 | 均匀、正态、双高斯、瑞利、莱斯、对数正态、nakagami、Suzuki分布的随机数仿真(matlab)
  18. laravel faker 数据填充 中文数据填充 单元测试数据填充 数据的类型有哪些
  19. 2023-2029年中国无线城市建设行业发展形势分析及投资规划分析报告
  20. 计算机开机卡在进入桌面的时候,电脑启动时卡在“正在启动WINDOWS”界面如何处理...

热门文章

  1. 内蒙古计算机软件考试时间,2018上半年内蒙古计算机软件水平考试报名工作通知...
  2. 重建大师 | 5.1.2版本具体说明
  3. C++——std::async和std::thread
  4. 山工kw什么意思_山工是什么意思和拼音怎么读
  5. 偏差(Bias)与方差(Variance)详解
  6. Mixly图形化编程四轴飞行器飞控程序
  7. 流体机械及工程计算机应用,流体机械及工程
  8. 西门子840d备份到u盘_西门子840D数控系统硬盘数据恢复法
  9. HDU 2547 无剑无我 水水。。
  10. 饥荒自建服务器怎么换人物,饥荒怎么更换人物 饥荒换服务器方法大全