Activemq实战
文章目录
- 消息发布模式和接口
- JMS发布消息模式
- JMS API
- 点对点模式
- 同步消费
- 异步消费
- 总结
- 发布/订阅模式
- 订阅者类型
- 点对点模式与发布订阅模式比较
- JMS
- 消息的可靠性和持久性
- 设置方式
- 生产者和消费者事务及ACK
- 事务
- 设置方式
- 总结
- ACK
- 非事务模式下ACK
- 事务模式下ACK
- 总结
- spring 整合 Activemq
- spring framework
- spring boot
- mqtt
- 高级特性
- 异步投递及确认
- 延迟投递和定时投递
- 消费重试机制
- 重新发送消息情况
- Java配置
- Spring配置
- sprint boot配置
- 死信队列
- 防止重复调用
- Activemq应用场景
- 参考
- JMS和AMQP
消息发布模式和接口
JMS发布消息模式
点对点模式:一个生产者向特定队列发送消息,一个消费者读取消息。一条消息仅能被一个消费者消费。
发布/订阅模式:0个或多个消费者 读取topic消息,一条消息可以被每个消费者消费。订阅者必须保持活动状态才能订阅消息。如果订阅者创建了持久订阅,则会在订阅者重新连接时重新发布。
JMS API
JMS开发的基本步骤:
- 创建一个JMS
Connection Factory
- 通过
Connection Factory
来创建JMSConnection
- 启动JMS
Connection
- 通过Connection创建JMS
Session
- 创建JMS
Destination
- 创建JMS
Producer
,或者创建JMSMessage
,并设置Destination
- 创建JMS
Consumer
,或者是注册一个JMSMessage Listener
- 发送或者接收
Message
(s) - 关闭所有的JMS资源(Connection 、Session、Producer、Consumer等)
点对点模式
同步消费
MessageConsumer.receive()
方法,会阻塞。
异步消费
使用MessageListener
创建异步监听。
总结
每一条消息只会被一个消费者消费
消息被消费会,不会再被存储。
多个消费者消费一个队列,未被消费的消息,会被轮询被已经活动的消费者 消费。
发布/订阅模式
订阅者类型
- 活动持久订阅者。设置了 Connection 的 ClientId 的订阅者。
- 离线持久订阅者。下线了的持久订阅者
- 活动非持久订阅者。未设置ClientId,默认情况。
要用持久化订阅,发送者要用DelIveryMode.PERSISTENT
模式发送,在连接之前设定。一定要设置完成后,再start这个connection
Message Durability 与 Message Persistence 是不同的,Message Durability 仅指发布订阅模式下的订阅者
点对点模式与发布订阅模式比较
比 较点 | topic | queue |
---|---|---|
工作模式 |
如果当前没有订阅者,则所有消息丢弃。 每条消息都会发送到所有订阅者 |
没有消费者,消息不会丢弃。 如果有多个消费者,消息采用**负载均衡模式(轮询)**保证一条消息只会发送给一个消费者 |
有无状态 | 无状态 | queue数据会以文件形式保存,或者配置成DB。 |
投递完整性 | 当前没有订阅者,则所有消息丢弃 | 消息不会丢弃 |
处理效率 | 消息会根据订阅者数量复制,订阅者数量多性能会降低。 | 一条消息发送到一个消费者,消费者数量不影响效率。 |
JMS
发送ObjectMessage时,如果是自定义的对象,需要设置trustAllPackages
消息的可靠性和持久性
设置方式
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //默认是持久性的。
注意:只有queue 持久化才有意义。topic的持久化没有任何意义,因为如果没有订阅者,发送的消息就是没有价值的。
生产者和消费者事务及ACK
事务
设置方式
session = connection.createSession(useTransaction,Session.AUTO_ACKNOWLEDGE);
session.commit();
总结
生产者开启了事务,必须显性的提交事务,不然消息不会发送到broker。
事务偏生产者,ACK偏消费者,消费者端不需要设置事务,如果设置了事务,则ACK不会提交,会导致重复消费。
ACK
提交,指调用
session.commit()
方法。ACK指调用了message.acknowledge()
方法。
非事务模式下ACK
生产者设置为自动ACK,消费者设置为自动ACK,正常
生产者设置为自动ACK,消费者设置为手动ACK,如果没有调用ACK方法,则会重复消费。
生产者设置为手动ACK,消费者设置为自动ACK,正常
事务模式下ACK
生产者ACK设置为自动,消费者ACK设置为自动。已提交,正常
生产者ACK设置为自动,消费者ACK设置为自动。未提交,会出现重复消费
生产者ACK设置为自动,消费者ACK设置为手动。已提交,未 ACK ,正常
生产者ACK设置为自动,消费者ACK设置为手动。未提交,已ACK,会出现重复消费
生产者ACK设置为手动,消费者ACK设置为自动。已提交,正常
生产者ACK设置为手动,消费者ACK设置为自动。未提交,会出现重复消费
生产者ACK设置为手动,消费者ACK设置为自动。未提交,已ACK,会出现重复消费
生产者ACK设置为手动,消费者ACK设置为自动。未提交,未ACK,会出现重复消费
生产者ACK设置为手动,消费者ACK设置为手动。已提交,已ACK,正常
生产者ACK设置为手动,消费者ACK设置为手动。已提交,未ACK,正常
总结
在非事务中,消费者根据是否ACK,决定消息是否标记未已消费。
在事务中,消费者根据事务是否提交,决定消息是否标记为已消费。如果事务回滚,则消息会被再次投递。
spring 整合 Activemq
spring framework
<!-- activemq依赖--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.5</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.5</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency>
spring boot
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 连接池可以单独依赖 -->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.5</version>
</dependency>
spring:activemq:user: adminpassword: adminbroker-url: tcp://192.168.153.129:61616pool:enabled: truemax-connections: 10jms:pub-sub-domain: false #只能监听队列,不能是topic。
重点:
@EnableJms注解:
@JmsListener 注解:
属性参考类:
ActiveMQProperties
,JmsProperties
mqtt
Activemq默认就开启的
WEB端使用参考:
mqtt.rar
高级特性
异步投递及确认
同步发送,send()方法会阻塞,直到broker给出一个确认消息。非事务、持久化消息 场景默认的方式。
异步发送,不会阻塞,会提高吞吐量。大部分场景的默认方式。
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setUseAsyncSend(true);//异步发送
异步发送,可能会丢失消息,可以在send()方法中带上AsyncCallback
类型参数
producer.send(message, new AsyncCallback() {@Overridepublic void onSuccess() {try {System.out.println("消息发送成功:" + message.getStringProperty("msgId"));} catch (JMSException e) {e.printStackTrace();}}@Overridepublic void onException(JMSException e) {try {System.out.println("出现异常:" + message.getStringProperty("msgId"));} catch (JMSException je) {je.printStackTrace();}}});
延迟投递和定时投递
Activemq 需要开启 Scheduler,broker的schedulerSupport
属性。
设置属性:
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延迟投递的时间 |
AMQ_SCHEDULED_PERIOD | long | 重复投递的时间间隔 |
AMQ_SCHEDULED_REPEAT | int | 重复投递次数,(不包括第一次) |
AMQ_SCHEDULED_CRON | String | Cron表达式 |
示例:
//延迟30秒,投递10次,间隔10秒:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);//CRON 表达式 ,定时投递。linux crontab 表达式。
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);//CRON表达式的优先级高于另外三个参数,如果在设置了CRON的同时,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔为period。就是说设置是叠加的效果。例如每小时都会发生消息被投递10次,延迟1秒开始,每次间隔1秒:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
producer.send(message);
设置延时投递后,会在scheduler中有一条记录,会记录任务信息。投递
repeat
次,则会把消息往 指定 destination 中 发送repeat + 1
次。
消费重试机制
重新发送消息情况
消息重发指的是消息可以被broker重新投递,不一定是之前的消费者。重新投递后,消费者可以重新消费此消息。
- 事务会话中,rollback的消息,会被重新投递。
- 消费端,使用客户端手动确认,还未确认,执行
session.recover()
,还未ACK的消息都会被重新投递。 - 所有未ACK消息,通过
session.close()
关闭事务,则这些消息会被broker立即重发。 - 消息被拉取后,ACK超时,也会重发
总结:即broker必须确认消息被消费了,才不会重发。1,2 会重新投递给原来的消费者,3,4 则不一定。
重发配置:http://activemq.apache.org/redelivery-policy
Java配置
// 手动修改RedeliveryPolicy(重发策略)
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);// 修改重发次数为3次
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);// 将重发策略设置到ConnectionFactory中
Spring配置
<!--定义RedeliveryPolicy(重发机制)-->
<bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"><property name="useCollisionAvoidance" value="false"/><property name="useExponentialBackOff" value="true"/><property name="maximumRedeliveries" value="3"/><property name="initialRedeliveryDelay" value="1000"/><property name="backOffMultiplier" value="2"/><property name="maximumRedeliveryDelay" value="1000"/>
</bean>
<!--创建连接工厂-->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/><!--引用自定义重发机制--><property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>
</bean>
sprint boot配置
@Configuration
public class ActiveMQConfig {@Beanpublic JmsListenerContainerFactory<?> queueListenerFactory(@Qualifier("activeMQConnectionFactory") ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();container.setConcurrentConsumers(3);container.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);factory.setConnectionFactory(connectionFactory);factory.setConcurrency("3-15"); //连接数factory.setRecoveryInterval(1000L); //重连间隔时间factory.setSessionAcknowledgeMode(4);return factory;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory(){ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();connectionFactory.setTrustAllPackages(true);connectionFactory.setRedeliveryPolicy(redeliveryPolicy());return connectionFactory;}@Beanpublic RedeliveryPolicy redeliveryPolicy(){RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次redeliveryPolicy.setMaximumRedeliveries(5);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory){JmsTemplate jmsTemplate=new JmsTemplate();jmsTemplate.setDeliveryMode(1);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(activeMQConnectionFactory);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式return jmsTemplate;}}
死信队列
设置每个队列的死信队列,activemq.xml
<destinationPolicy><policyMap><policyEntries><policyEntry queue=">"><deadLetterStrategy><individualDeadLetterStrategy queuePrefix="DLQ."useQueueForQueueMessages="true"processExpired="false"processNonPersistent="false"/></deadLetterStrategy></policyEntry></policyEntries></policyMap>
</destinationPolicy>
防止重复调用
1、如果是做数据库插入,设置一个唯一主键
2、通过第三方服务做消费记录,例如Redis。
Activemq应用场景
- 异步处理
- 应用解耦
- 流量消峰
- 日志处理(异步处理的一种应用)
- 消息通信
参考
spring JMS:https://spring.io/guides/gs/messaging-jms/
https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms-using
https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.amqp
jms配置:https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.jms.activemq
重发配置:http://activemq.apache.org/redelivery-policy
JMS和AMQP
1.大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力
2.消息服务中两个重要概念:- 消息代理(message broker)和目的地(destination)- 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
3.消息队列主要有两种形式的目的地- 队列(queue):点对点消息通信(point-to-point)- 主题(topic):发布(publish)/订阅(subscribe)消息通信
4.点对点式:–消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列–消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
5.发布订阅式:–发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息
6.JMS(Java Message Service)JAVA消息服务:–基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
7.AMQP(Advanced Message Queuing Protocol)–高级消息队列协议,也是一个消息代理的规范,兼容JMS–RabbitMQ是AMQP的实现
8.Spring支持–spring-jms提供了对JMS的支持–spring-rabbit提供了对AMQP的支持–需要ConnectionFactory的实现来连接消息代理–提供JmsTemplate、RabbitTemplate来发送消息–@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息–@EnableJms、@EnableRabbit开启支持
9.Spring Boot自动配置–JmsAutoConfiguration–RabbitAutoConfiguration
Activemq实战相关推荐
- 企业级实战02_SpringMVC整合ActiveMQ 实战需求
SpringMVC整合ActiveMQ 文章目录 一.Spring整合ActiveMQ实战 1.1. 创建一个父工程: 1.2. 引入依赖 1.3. 创建一个子项目生产者 1.4. 创建一个sprin ...
- java 监听队列_spring+activemq实战之配置监听多队列实现不同队列消息消费
摘选:https://my.oschina.net/u/3613230/blog/1457227 摘要: 最近在项目开发中,需要用到activemq,用的时候,发现在同一个项目中point-to-po ...
- ActiveMQ实战篇之 java和spring xml创建Broker(一)
ActivityMQ创建broker是直接通过配置IOC注入的,所以了解如何用纯JAVA代码和spring xml 创建broker可以让我们对AcitivtyMQ有一个更深入的了解 原本的配置 &l ...
- Apache ActiveMQ实战(1)-基本安装配置与消息类型
ActiveMQ简介 ActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的.可扩展的.稳定的和安全的企业级消息通信.ActiveMQ使用Apache ...
- activimq java集成_Java消息队列-Spring整合ActiveMq
1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Jav ...
- Java消息队列-Spring整合ActiveMq
1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 消息服务:一个中间件,用于解决两个或多个程序之间的耦合,底层由Jav ...
- java 消息队列详解_Java消息队列-Spring整合ActiveMq的详解
本篇文章主要介绍了详解Java消息队列-Spring整合ActiveMq ,小编觉得挺不错的,现在分享给大家,也给大家做个参考.一起跟随小编过来看看吧 1.概述 首先和大家一起回顾一下Java 消息服 ...
- ActiveMQ消息中间件之队列模式和主题模式详解
一.ActiveMQ消息中间件 在传统的消息发送和接收模式上,一般是以同步的方式来发送接收消息,以同步的方式来推送消息对我们的服务有时造成了很大的影响,比如当我们的服务器出现了故障,客户端推送消息到服 ...
- 【聊透SpringMVC】java找不到指定文件
RabbitMQ: 优点:轻量,迅捷,容易部署和使用,拥有灵活的路由配置 缺点:性能和吞吐量较差,不易进行二次开发 RocketMQ: 优点:性能好,稳定可靠,有活跃的中文社区,特点响应快 缺点:兼容 ...
最新文章
- 如何禁用UITableView选择?
- windows phone (23) ScrollViewer元素
- 实现接口时@Override注解问题
- 5.报错:ImportError: No module named win32api
- 博士生的deadline血泪史,这是一份来自Nature的避坑指南
- MYSQL 时间处理
- HashMap的使用方法及注意事项
- android反编译工具 ApkDec-Release-0.1
- jsp中实现文件下载 两种方法
- 【LeetCode】刷题工具
- 区块链:POA委员会选举机制
- linux中etc目录的作用,/etc 目录的作用到底是干什么用的?
- 阿里云个人网站备案流程
- List1_Excise
- Day 6.重大医疗伤害事件网络舆情能量传播过程分析*———以“魏则西事件”为例
- 商城电商day 06 三、商品详情业务需求分析
- 写一副对子_一副对子的传奇故事
- msec是毫秒; usec是微秒
- 21Winter\ C语言程序设计第六章
- 自定义小头像相互叠加