文章目录

  • 消息发布模式和接口
    • JMS发布消息模式
    • JMS API
  • 点对点模式
    • 同步消费
    • 异步消费
    • 总结
  • 发布/订阅模式
    • 订阅者类型
    • 点对点模式与发布订阅模式比较
  • JMS
  • 消息的可靠性和持久性
    • 设置方式
  • 生产者和消费者事务及ACK
    • 事务
      • 设置方式
      • 总结
    • ACK
      • 非事务模式下ACK
      • 事务模式下ACK
      • 总结
  • spring 整合 Activemq
    • spring framework
    • spring boot
  • mqtt
  • 高级特性
    • 异步投递及确认
    • 延迟投递和定时投递
    • 消费重试机制
      • 重新发送消息情况
      • Java配置
      • Spring配置
      • sprint boot配置
    • 死信队列
    • 防止重复调用
  • Activemq应用场景
  • 参考
    • JMS和AMQP

消息发布模式和接口

JMS发布消息模式

点对点模式:一个生产者向特定队列发送消息,一个消费者读取消息。一条消息仅能被一个消费者消费。

发布/订阅模式:0个或多个消费者 读取topic消息,一条消息可以被每个消费者消费。订阅者必须保持活动状态才能订阅消息。如果订阅者创建了持久订阅,则会在订阅者重新连接时重新发布。

JMS API

JMS开发的基本步骤:

  1. 创建一个JMS Connection Factory
  2. 通过Connection Factory来创建JMS Connection
  3. 启动JMS Connection
  4. 通过Connection创建JMS Session
  5. 创建JMS Destination
  6. 创建JMS Producer,或者创建JMS Message,并设置Destination
  7. 创建JMS Consumer,或者是注册一个JMS Message Listener
  8. 发送或者接收Message(s)
  9. 关闭所有的JMS资源(Connection 、Session、Producer、Consumer等)

点对点模式

同步消费

MessageConsumer.receive()方法,会阻塞。

异步消费

使用MessageListener 创建异步监听。

总结

每一条消息只会被一个消费者消费

消息被消费会,不会再被存储。

多个消费者消费一个队列,未被消费的消息,会被轮询被已经活动的消费者 消费。

发布/订阅模式

订阅者类型

  • 活动持久订阅者。设置了 Connection 的 ClientId 的订阅者。
  • 离线持久订阅者。下线了的持久订阅者
  • 活动非持久订阅者。未设置ClientId,默认情况。

要用持久化订阅,发送者要用DelIveryMode.PERSISTENT模式发送,在连接之前设定。一定要设置完成后,再start这个connection

Message Durability 与 Message Persistence 是不同的,Message Durability 仅指发布订阅模式下的订阅者

点对点模式与发布订阅模式比较

比 较点 topic queue
工作模式 如果当前没有订阅者,则所有消息丢弃。
每条消息都会发送到所有订阅者
没有消费者,消息不会丢弃。
如果有多个消费者,消息采用**负载均衡模式(轮询)**保证一条消息只会发送给一个消费者
有无状态 无状态 queue数据会以文件形式保存,或者配置成DB。
投递完整性 当前没有订阅者,则所有消息丢弃 消息不会丢弃
处理效率 消息会根据订阅者数量复制,订阅者数量多性能会降低。 一条消息发送到一个消费者,消费者数量不影响效率。

JMS

发送ObjectMessage时,如果是自定义的对象,需要设置trustAllPackages

消息的可靠性和持久性

设置方式

MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  //默认是持久性的。

注意:只有queue 持久化才有意义。topic的持久化没有任何意义,因为如果没有订阅者,发送的消息就是没有价值的。

生产者和消费者事务及ACK

事务

设置方式

session = connection.createSession(useTransaction,Session.AUTO_ACKNOWLEDGE);
session.commit();

总结

生产者开启了事务,必须显性的提交事务,不然消息不会发送到broker。

事务偏生产者,ACK偏消费者,消费者端不需要设置事务,如果设置了事务,则ACK不会提交,会导致重复消费。

ACK

提交,指调用 session.commit()方法。ACK指调用了message.acknowledge()方法。

非事务模式下ACK

生产者设置为自动ACK,消费者设置为自动ACK,正常

生产者设置为自动ACK,消费者设置为手动ACK,如果没有调用ACK方法,则会重复消费。

生产者设置为手动ACK,消费者设置为自动ACK,正常

事务模式下ACK

生产者ACK设置为自动,消费者ACK设置为自动。已提交,正常

生产者ACK设置为自动,消费者ACK设置为自动。未提交,会出现重复消费

生产者ACK设置为自动,消费者ACK设置为手动。已提交,未 ACK ,正常

生产者ACK设置为自动,消费者ACK设置为手动。未提交,已ACK,会出现重复消费

生产者ACK设置为手动,消费者ACK设置为自动。已提交,正常

生产者ACK设置为手动,消费者ACK设置为自动。未提交,会出现重复消费

生产者ACK设置为手动,消费者ACK设置为自动。未提交,已ACK,会出现重复消费

生产者ACK设置为手动,消费者ACK设置为自动。未提交,未ACK,会出现重复消费

生产者ACK设置为手动,消费者ACK设置为手动。已提交,已ACK,正常

生产者ACK设置为手动,消费者ACK设置为手动。已提交,未ACK,正常

总结

非事务中,消费者根据是否ACK,决定消息是否标记未已消费。

事务中,消费者根据事务是否提交,决定消息是否标记为已消费。如果事务回滚,则消息会被再次投递。

spring 整合 Activemq

spring framework

 <!--  activemq依赖--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.5</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.5</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency>

spring boot

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 连接池可以单独依赖 -->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.5</version>
</dependency>
spring:activemq:user: adminpassword: adminbroker-url: tcp://192.168.153.129:61616pool:enabled: truemax-connections: 10jms:pub-sub-domain: false  #只能监听队列,不能是topic。

重点:

@EnableJms注解:

@JmsListener 注解:

属性参考类:ActiveMQPropertiesJmsProperties

mqtt

Activemq默认就开启的

WEB端使用参考:

mqtt.rar

高级特性

异步投递及确认

同步发送,send()方法会阻塞,直到broker给出一个确认消息。非事务、持久化消息 场景默认的方式。

异步发送,不会阻塞,会提高吞吐量。大部分场景的默认方式。

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setUseAsyncSend(true);//异步发送

异步发送,可能会丢失消息,可以在send()方法中带上AsyncCallback类型参数

producer.send(message, new AsyncCallback() {@Overridepublic void onSuccess() {try {System.out.println("消息发送成功:" + message.getStringProperty("msgId"));} catch (JMSException e) {e.printStackTrace();}}@Overridepublic void onException(JMSException e) {try {System.out.println("出现异常:" + message.getStringProperty("msgId"));} catch (JMSException je) {je.printStackTrace();}}});

延迟投递和定时投递

Activemq 需要开启 Scheduler,broker的schedulerSupport属性。

设置属性:

Property name type description
AMQ_SCHEDULED_DELAY long 延迟投递的时间
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT int 重复投递次数,(不包括第一次)
AMQ_SCHEDULED_CRON String Cron表达式

示例:

//延迟30秒,投递10次,间隔10秒:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);//CRON 表达式  ,定时投递。linux crontab 表达式。
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);//CRON表达式的优先级高于另外三个参数,如果在设置了CRON的同时,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔为period。就是说设置是叠加的效果。例如每小时都会发生消息被投递10次,延迟1秒开始,每次间隔1秒:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
producer.send(message);

设置延时投递后,会在scheduler中有一条记录,会记录任务信息。投递repeat次,则会把消息往 指定 destination 中 发送 repeat + 1 次。

消费重试机制

重新发送消息情况

消息重发指的是消息可以被broker重新投递,不一定是之前的消费者。重新投递后,消费者可以重新消费此消息。

  1. 事务会话中,rollback的消息,会被重新投递。
  2. 消费端,使用客户端手动确认,还未确认,执行session.recover(),还未ACK的消息都会被重新投递。
  3. 所有未ACK消息,通过session.close()关闭事务,则这些消息会被broker立即重发。
  4. 消息被拉取后,ACK超时,也会重发

总结:即broker必须确认消息被消费了,才不会重发。1,2 会重新投递给原来的消费者,3,4 则不一定。

重发配置:http://activemq.apache.org/redelivery-policy

Java配置

// 手动修改RedeliveryPolicy(重发策略)
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);// 修改重发次数为3次
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);// 将重发策略设置到ConnectionFactory中

Spring配置

<!--定义RedeliveryPolicy(重发机制)-->
<bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"><property name="useCollisionAvoidance" value="false"/><property name="useExponentialBackOff" value="true"/><property name="maximumRedeliveries" value="3"/><property name="initialRedeliveryDelay" value="1000"/><property name="backOffMultiplier" value="2"/><property name="maximumRedeliveryDelay" value="1000"/>
</bean>
<!--创建连接工厂-->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/><!--引用自定义重发机制--><property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>
</bean>

sprint boot配置

@Configuration
public class ActiveMQConfig {@Beanpublic JmsListenerContainerFactory<?> queueListenerFactory(@Qualifier("activeMQConnectionFactory") ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();container.setConcurrentConsumers(3);container.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);factory.setConnectionFactory(connectionFactory);factory.setConcurrency("3-15");  //连接数factory.setRecoveryInterval(1000L); //重连间隔时间factory.setSessionAcknowledgeMode(4);return factory;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory(){ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();connectionFactory.setTrustAllPackages(true);connectionFactory.setRedeliveryPolicy(redeliveryPolicy());return connectionFactory;}@Beanpublic RedeliveryPolicy redeliveryPolicy(){RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次redeliveryPolicy.setMaximumRedeliveries(5);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory){JmsTemplate jmsTemplate=new JmsTemplate();jmsTemplate.setDeliveryMode(1);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(activeMQConnectionFactory);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式return jmsTemplate;}}

死信队列

设置每个队列的死信队列,activemq.xml

<destinationPolicy><policyMap><policyEntries><policyEntry queue=">"><deadLetterStrategy><individualDeadLetterStrategy queuePrefix="DLQ."useQueueForQueueMessages="true"processExpired="false"processNonPersistent="false"/></deadLetterStrategy></policyEntry></policyEntries></policyMap>
</destinationPolicy>

防止重复调用

1、如果是做数据库插入,设置一个唯一主键

2、通过第三方服务做消费记录,例如Redis。

Activemq应用场景

  • 异步处理
  • 应用解耦
  • 流量消峰
  • 日志处理(异步处理的一种应用)
  • 消息通信

参考

spring JMS:https://spring.io/guides/gs/messaging-jms/

https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms-using

https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.amqp

jms配置:https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.jms.activemq

重发配置:http://activemq.apache.org/redelivery-policy

JMS和AMQP

1.大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力
2.消息服务中两个重要概念:- 消息代理(message broker)和目的地(destination)- 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
3.消息队列主要有两种形式的目的地- 队列(queue):点对点消息通信(point-to-point)- 主题(topic):发布(publish)/订阅(subscribe)消息通信
4.点对点式:–消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列–消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
5.发布订阅式:–发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息
6.JMS(Java Message Service)JAVA消息服务:–基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
7.AMQP(Advanced Message Queuing Protocol)–高级消息队列协议,也是一个消息代理的规范,兼容JMS–RabbitMQ是AMQP的实现
8.Spring支持–spring-jms提供了对JMS的支持–spring-rabbit提供了对AMQP的支持–需要ConnectionFactory的实现来连接消息代理–提供JmsTemplate、RabbitTemplate来发送消息–@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息–@EnableJms、@EnableRabbit开启支持
9.Spring Boot自动配置–JmsAutoConfiguration–RabbitAutoConfiguration

Activemq实战相关推荐

  1. 企业级实战02_SpringMVC整合ActiveMQ 实战需求

    SpringMVC整合ActiveMQ 文章目录 一.Spring整合ActiveMQ实战 1.1. 创建一个父工程: 1.2. 引入依赖 1.3. 创建一个子项目生产者 1.4. 创建一个sprin ...

  2. java 监听队列_spring+activemq实战之配置监听多队列实现不同队列消息消费

    摘选:https://my.oschina.net/u/3613230/blog/1457227 摘要: 最近在项目开发中,需要用到activemq,用的时候,发现在同一个项目中point-to-po ...

  3. ActiveMQ实战篇之 java和spring xml创建Broker(一)

    ActivityMQ创建broker是直接通过配置IOC注入的,所以了解如何用纯JAVA代码和spring xml 创建broker可以让我们对AcitivtyMQ有一个更深入的了解 原本的配置 &l ...

  4. Apache ActiveMQ实战(1)-基本安装配置与消息类型

    ActiveMQ简介 ActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的.可扩展的.稳定的和安全的企业级消息通信.ActiveMQ使用Apache ...

  5. activimq java集成_Java消息队列-Spring整合ActiveMq

    1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Jav ...

  6. Java消息队列-Spring整合ActiveMq

    1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 消息服务:一个中间件,用于解决两个或多个程序之间的耦合,底层由Jav ...

  7. java 消息队列详解_Java消息队列-Spring整合ActiveMq的详解

    本篇文章主要介绍了详解Java消息队列-Spring整合ActiveMq ,小编觉得挺不错的,现在分享给大家,也给大家做个参考.一起跟随小编过来看看吧 1.概述 首先和大家一起回顾一下Java 消息服 ...

  8. ActiveMQ消息中间件之队列模式和主题模式详解

    一.ActiveMQ消息中间件 在传统的消息发送和接收模式上,一般是以同步的方式来发送接收消息,以同步的方式来推送消息对我们的服务有时造成了很大的影响,比如当我们的服务器出现了故障,客户端推送消息到服 ...

  9. 【聊透SpringMVC】java找不到指定文件

    RabbitMQ: 优点:轻量,迅捷,容易部署和使用,拥有灵活的路由配置 缺点:性能和吞吐量较差,不易进行二次开发 RocketMQ: 优点:性能好,稳定可靠,有活跃的中文社区,特点响应快 缺点:兼容 ...

最新文章

  1. 如何禁用UITableView选择?
  2. windows phone (23) ScrollViewer元素
  3. 实现接口时@Override注解问题
  4. 5.报错:ImportError: No module named win32api
  5. 博士生的deadline血泪史,这是一份来自Nature的避坑指南
  6. MYSQL 时间处理
  7. HashMap的使用方法及注意事项
  8. android反编译工具 ApkDec-Release-0.1
  9. jsp中实现文件下载   两种方法
  10. 【LeetCode】刷题工具
  11. 区块链:POA委员会选举机制
  12. linux中etc目录的作用,/etc 目录的作用到底是干什么用的?
  13. 阿里云个人网站备案流程
  14. List1_Excise
  15. Day 6.重大医疗伤害事件网络舆情能量传播过程分析*———以“魏则西事件”为例
  16. 商城电商day 06 三、商品详情业务需求分析
  17. 写一副对子_一副对子的传奇故事
  18. msec是毫秒; usec是微秒
  19. 21Winter\ C语言程序设计第六章
  20. 自定义小头像相互叠加

热门文章

  1. Pocket PC 2003 SE设备仿真器网络设置
  2. JavaWeb 安全问题及解决方案
  3. 【Apache POI】基础处理方法总结
  4. Vitamio中文API文档(1)—— MediaStore
  5. React中props与state的区别
  6. 当INPUT 连续输入是连续触发
  7. printf按8进制、16进制输出
  8. Web开发人员有用的代码比较工具
  9. Spring Boot 专栏全栈开发实战
  10. win10虚拟机dhcp服务器设置,win10 dhcp服务器设置方法