消费者客户端成功接收一条消息的标志是:这条消息被签收。
消费者客户端成功接收一条消息一般包括三个阶段:

         1、消费者接收消息,也即从MessageConsumer的receive方法返回

2、消费者处理消息

3、消息被签收

其中,第三阶段的签收可以有ActiveMQ发起,也可以由消费者客户端发起,取决于Session是否开启事务以及签收模式的设置。

在带事务的Session中,消费者客户端事务提交之时,消息自动完成签收。

在不带事务的Session中,消息何时以及如何被签收取决于Session的签收模式设置

activemq的消息确认机制就是文档中说的ack机制有:
    AUTO_ACKNOWLEDGE = 1    自动确认
    CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
    DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
    SESSION_TRANSACTED = 0    事务提交并确认
    INDIVIDUAL_ACKNOWLEDGE = 4    单条消息确认 activemq 独有
  ACK模式描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。
  对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumer(/producer)
  与Broker之间建立一种简单的“担保”机制.
  手动确认和单条消息确认需要手动的在客户端调用message.acknowledge()
  消息重发机制RedeliveryPolicy 有几个属性如下:
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次   这里设置为10次redeliveryPolicy.setMaximumRedeliveries(10);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);

那么在整合activemq时候就只需要修改配置文件和客户端就可以了,activemq就是这种机制,例如支付宝支付回调的时候,只有我们返回一个success,支付那边才不会给我重发消息

配置文件:

import javax.jms.Queue;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;@EnableJms
@Configuration
public class ActiveMQ4Config {  @Beanpublic Queue queue(){return new ActiveMQQueue("queue1");}@Beanpublic RedeliveryPolicy redeliveryPolicy(){RedeliveryPolicy  redeliveryPolicy=   new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次   这里设置为10次redeliveryPolicy.setMaximumRedeliveries(10);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){  ActiveMQConnectionFactory activeMQConnectionFactory =  new ActiveMQConnectionFactory("admin","admin",url);activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);return activeMQConnectionFactory;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){JmsTemplate jmsTemplate=new JmsTemplate();jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化
        jmsTemplate.setConnectionFactory(activeMQConnectionFactory);jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式return jmsTemplate;}//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂@Bean(name = "jmsQueueListener")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(activeMQConnectionFactory);//设置连接数factory.setConcurrency("1-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setSessionAcknowledgeMode(4);return factory;}}

消费者:

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {private final static Logger logger = LoggerFactory.getLogger(Consumer.class);@JmsListener(destination = "queue1", containerFactory = "jmsQueueListener")public void receiveQueue(final TextMessage text, Session session)throws JMSException {try {logger.debug("Consumer收到的报文为:" + text.getText());text.acknowledge();// 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发} catch (Exception e) {    session.recover();// 此不可省略 重发信息使用
        }}
}

由此可以知道activemq的queue消息是可以保证消息不丢失,不会被重复消费的,因为会给每个消息设置一个唯一的id,当消息发送失败之后可以根据这个机制来进行消费,当然也是一种处理分布式事物的方法

消息中间件的模式是可以保证消息不会丢失的,持久化和自动重发,消息回签,都可以很好的避免那种机制。消费端代码发生异常了,可以自动重发,自动消息重发。由于之前在测试的时候足够看官方文档,所以理解说客户端发生异常了,是不可以进行重发的,但是今天了解之后,发觉还是自动重发的机制,利用回签机制进行的。

转载于:https://www.cnblogs.com/xiufengchen/p/10563402.html

springboot整合activemq加入会签,自动重发机制,持久化相关推荐

  1. SpringBoot整合activeMQ消息队列手动签收(Session.CLIENT_ACKNOWLEDGE)为什么失效啊?

    今天在家隔离办公,不太忙,然后就琢磨起来消息队列activeMQ的消息事务来解决分布式事务,但是奈何在SpringBoot整合activeMQ时,其消费者手动签收消息时出现了问题-->当acti ...

  2. activeMQ基础学习和SpringBoot整合activeMQ案例

    昨天仔细研究了activeMQ消息队列,也遇到了些坑,昨天晚上也写了篇文章记录坑的内容,其实上篇文章(SpringBoot整合activeMQ消息队列手动签收(Session.CLIENT_ACKNO ...

  3. springboot整合ActiveMQ(点对点和发布订阅)

    springboot整合ActiveMQ(点对点和发布订阅) ActiveMQ是什么,为什么使用MQ 是基于 Java 中的 JMS 消息服务规范实现的一个消息中间件. 1.系统解耦 采用中间件之后, ...

  4. SpringBoot SpringBoot 开发实用篇 5 整合第三方技术 5.21 SpringBoot 整合 ActiveMQ

    SpringBoot [黑马程序员SpringBoot2全套视频教程,springboot零基础到项目实战(spring boot2完整版)] SpringBoot 开发实用篇 文章目录 Spring ...

  5. 详细的springboot整合activeMq安装与使用(上)

    最近在学习activeMQ消息中间件,特此记录一下,方便以后使用. 如果有不严谨的地方,欢迎大家提出,共同进步呀. 本章,会先讲解activeMq的基本介绍.安装.和更改用户名.端口号.下节会仔细讲解 ...

  6. SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka

    1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...

  7. Springboot整合ActiveMQ发送邮件

    虽然ActiveMQ以被其他MQ所替代,但仍有学习的意义,本文采用邮件发送的例子展示ActiveMQ 文章目录 1. 生产者 1.1 引入maven依赖 1.2 application.yml配置 1 ...

  8. springboot整合RabbitMQ实现延时自动取消订单

    1.pom依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>s ...

  9. SpringBoot整合第三方技术学习笔记(自用)

    SpringBoot整合第三方技术学习笔记 搬运黑马视频配套笔记 KF-4.数据层解决方案 KF-4-1.SQL 回忆一下之前做SSMP整合的时候数据层解决方案涉及到了哪些技术?MySQL数据库与My ...

最新文章

  1. Permission denied 故障
  2. mysql dba系统学习(20)mysql存储引擎MyISAM
  3. Python os.path路径模块中的操作方法总结
  4. 微信收款音响s3服务器断开,微信收款音响s2和s3有什么区别
  5. RTX5 | STM32H743+CubeMX+RTX5+两路FDCAN驱动+CANopen协议
  6. NCFM识别-Googlenet
  7. TF2.0-tf.keras.layers.Concatenate
  8. 0到50带圆圈的数字序号有需要的吗:)
  9. 02、差分特性阻抗仿真
  10. 第08篇:Mybatis事务处理
  11. 视频字幕 硬字幕 软字幕 外挂字幕 简介
  12. 智过网:2023年注册测绘师考试执业范围
  13. 标准差计算-python(有偏无偏)
  14. c++二叉树打印(只为美观)
  15. google账号已停用,此账号的使用方式似乎违反了Google的政策
  16. 云讯健身管理系统-11--NUXT和Redis
  17. IDM免费安装注册使用,两步注册成功
  18. [附源码]Java计算机毕业设计SSM道路桥梁工程知识文库系统
  19. 宣传单彩页_彩页宣传单文案如何设计客户才会接受
  20. Bellon(多么痛的领悟)

热门文章

  1. 漫步最优化二十二——收敛速率
  2. c语言查看变量类型_c语言外部链接的静态变量的四种类型
  3. 【渗透测试】XSS注入原理
  4. 隐马尔可夫模型的三个基本问题
  5. 网络爬虫之Url含有中文如何转码
  6. extend()与append()的区别
  7. 2-3树与2-3-4树【转载】
  8. 实数范围内(包含负数)的求模与求余运算异同
  9. Android studio 快速解决Gradle's dependency cache may be corrupt 和 Gradle配置 gradle-3.*-all.zip快速下载
  10. JAVA SAX解析XML文档