ActiveMQ消息的延时和定时投递
-
ActiveMQ对消息延时和定时投递做了很好的支持,其内部启动Scheduled来对该功能支持,也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,只需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息。
Property name type description AMQ_SCHEDULED_DELAY long 延迟投递的时间 AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔 AMQ_SCHEDULED_REPEAT int 重复投递次数 AMQ_SCHEDULED_CRON String Cron表达式 下面我们演示一下间隔性重复投递;
生产者:
?123456789101112131415161718192021222324252627282930313233343536373839404142434445464748package
cn.slimsmart.study.activemq;
import
javax.jms.Connection;
import
javax.jms.ConnectionFactory;
import
javax.jms.DeliveryMode;
import
javax.jms.Destination;
import
javax.jms.MessageProducer;
import
javax.jms.Session;
import
javax.jms.TextMessage;
import
org.apache.activemq.ActiveMQConnection;
import
org.apache.activemq.ActiveMQConnectionFactory;
import
org.apache.activemq.ScheduledMessage;
public
class
Producer {
public
static
final
String broker_url =
"failover:(tcp://10.1.199.169:61616)"
;
private
static
String queue_name =
"test.queue"
;
public
static
void
main(String[] args)
throws
Exception {
ConnectionFactory factory =
new
ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
// 通过工厂创建一个连接
Connection connection = factory.createConnection();
// 启动连接
connection.start();
// 创建一个session会话 事务 自动ack
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(queue_name);
// 创建生产者
MessageProducer producer = session.createProducer(destination);
// 消息持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage(
"test delay message:"
+ System.currentTimeMillis());
long
time =
60
*
1000
;
// 延时1min
long
period =
10
*
1000
;
// 每个10s
int
repeat =
6
;
// 6次
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
// 发送消息
producer.send(message);
session.commit();
producer.close();
session.close();
connection.close();
}
}
消费者代码:
?123456789101112131415161718192021222324252627282930313233343536373839404142434445464748package
cn.slimsmart.study.activemq;
import
java.util.concurrent.CountDownLatch;
import
javax.jms.Connection;
import
javax.jms.ConnectionFactory;
import
javax.jms.Destination;
import
javax.jms.JMSException;
import
javax.jms.Message;
import
javax.jms.MessageConsumer;
import
javax.jms.MessageListener;
import
javax.jms.Session;
import
javax.jms.TextMessage;
import
org.apache.activemq.ActiveMQConnection;
import
org.apache.activemq.ActiveMQConnectionFactory;
public
class
Consumer {
public
static
final
String broker_url =
"failover:(tcp://10.1.199.169:61616)"
;
private
static
String queue_name =
"test.queue"
;
public
static
void
main(String[] args)
throws
Exception {
ConnectionFactory factory =
new
ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
// 通过工厂创建一个连接
Connection connection = factory.createConnection();
// 启动连接
connection.start();
// 创建一个session会话 事务 自动ack
Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(queue_name);
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(
new
MessageListener() {
@Override
public
void
onMessage(Message message) {
try
{
System.out.println(
"receive message :"
+ ((TextMessage) message).getText());
message.acknowledge();
}
catch
(JMSException e) {
e.printStackTrace();
}
}
});
new
CountDownLatch(
1
).await();
}
}
ActiveMQ消息的延时和定时投递相关推荐
- ActiveMQ—消息特性(延迟和定时消息投递)
ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article ...
- activemq消息丢失_Kafka or RabbitMQ:消息中间件选型深入分析
消息中间件选型深入分析 --从Kafka与RabbitMQ的对比来看全局 有很多网友留言:公司要做消息中间件选型,该如何选?你觉得哪个比较好?消息选型的确是一个大论题,实则说来话长的事情又如何长话短说 ...
- 计算发送延时与传播延迟_消息队列——延时消息应用解析及实践
简介:在大部分场景下业务系统如果只需要实现异步解耦.削峰填谷等能力,常规的普通消息就可以满足此类需求.除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况.这就需要消息队列服务本身支持 ...
- activemq nodejs stomp 重连机制_5分钟优劣分析 Kafka、RabbitMQ、RocketMQ、ActiveMQ消息队列...
一.资料文档 Kafka:中,有kafka作者自己写的书,网上资料也有一些. rabbitmq:有一些不错的书,网上资料多. zeromq:少.没有专门写zeromq的书,网上的资料多是一些代码的实现 ...
- springboot整合rocketMQ记录 实现发送普通消息,延时消息
一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例) 首先RocketMQ是阿里巴巴自研出来的,也已开源.其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂 ...
- JMS学习八(ActiveMQ消息持久化)
JMS学习八(ActiveMQ消息持久化) ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,还有一种内存存储的方式,由于内存不属于持久化范畴,而且如果使用内存队列,可 ...
- SpringBoot集成ActiveMq消息队列实现即时和延迟处理
原文链接:https://blog.csdn.net/My_harbor/article/details/81328727 一.安装ActiveMq 具体安装步骤:自己谷歌去 二.新建springbo ...
- 【转】ActiveMQ消息传送机制以及ACK机制详解
2019独角兽企业重金招聘Python工程师标准>>> 本文转载自 http://shift-alt-ctrl.iteye.com/blog/2020182 AcitveMQ是作为一 ...
- ActiveMQ消息传送机制以及ACK机制详解
2019独角兽企业重金招聘Python工程师标准>>> AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全 ...
最新文章
- MySQL 中 6 个常见的日志问题
- redis事务不具有回滚机制,那么它是如何进行事务控制的
- python大数据工程师 培训_大数据工程师学习之路
- java impala_Java实现impala操作kudu
- 【转】动态模型及其求解介绍–番外篇
- 做windows界面,用QT还是MFC?
- idea中不重启服务器更改代码(使用jrebel)
- 【老孙随笔】想学程序设计,先学人生设计!
- oracle归档日志百分比,Oracle归档日志处理
- (十六)51单片机——红外遥控
- linux系统nohob安装,Linux启动详解1
- tp-link与台式计算机连接教程,【详细图解】TP-Link TL-WDR6510路由器电脑设置教程...
- jsp+css实现图片自动轮换
- python aks_使用环回aks和terraform构建基于打字稿的游戏后端
- Android——Timer停不下来的解决方法
- Cartesi 举办的2023 黑客马拉松
- 坐标系统和投影变换基础知识及其在ArcGIS桌面产品中的应用(二)
- R语言和hadoop
- 任务调度-xxl-job
- 推荐两款支持在linux下运行ASP.NET网站的国产免费WEB服务器软件