ActiveMQ对消息延时和定时投递做了很好的支持,其内部启动Scheduled来对该功能支持,也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,只需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息。

Property name type description
AMQ_SCHEDULED_DELAY long 延迟投递的时间
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON String Cron表达式

下面我们演示一下间隔性重复投递;

生产者:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.slimsmart.study.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
public class Producer {
    public static final String broker_url = "failover:(tcp://10.1.199.169:61616)";
    private static String queue_name = "test.queue";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
        // 通过工厂创建一个连接
        Connection connection = factory.createConnection();
        // 启动连接
        connection.start();
        // 创建一个session会话 事务 自动ack
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // 创建一个消息队列
        Destination destination = session.createQueue(queue_name);
        // 创建生产者
        MessageProducer producer = session.createProducer(destination);
        // 消息持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        TextMessage message = session.createTextMessage("test delay message:" + System.currentTimeMillis());
        long time = 60 * 1000;// 延时1min
        long period = 10 * 1000;// 每个10s
        int repeat = 6;// 6次
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
        // 发送消息
        producer.send(message);
        session.commit();
        producer.close();
        session.close();
        connection.close();
    }
}

消费者代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.slimsmart.study.activemq;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
    public static final String broker_url = "failover:(tcp://10.1.199.169:61616)";
    private static String queue_name = "test.queue";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
        // 通过工厂创建一个连接
        Connection connection = factory.createConnection();
        // 启动连接
        connection.start();
        // 创建一个session会话 事务 自动ack
        Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
        // 创建一个消息队列
        Destination destination = session.createQueue(queue_name);
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    System.out.println("receive message :" + ((TextMessage) message).getText());
                    message.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        new CountDownLatch(1).await();
    }
}

ActiveMQ消息的延时和定时投递相关推荐

  1. ActiveMQ—消息特性(延迟和定时消息投递)

    ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article ...

  2. activemq消息丢失_Kafka or RabbitMQ:消息中间件选型深入分析

    消息中间件选型深入分析 --从Kafka与RabbitMQ的对比来看全局 有很多网友留言:公司要做消息中间件选型,该如何选?你觉得哪个比较好?消息选型的确是一个大论题,实则说来话长的事情又如何长话短说 ...

  3. 计算发送延时与传播延迟_消息队列——延时消息应用解析及实践

    简介:在大部分场景下业务系统如果只需要实现异步解耦.削峰填谷等能力,常规的普通消息就可以满足此类需求.除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况.这就需要消息队列服务本身支持 ...

  4. activemq nodejs stomp 重连机制_5分钟优劣分析 Kafka、RabbitMQ、RocketMQ、ActiveMQ消息队列...

    一.资料文档 Kafka:中,有kafka作者自己写的书,网上资料也有一些. rabbitmq:有一些不错的书,网上资料多. zeromq:少.没有专门写zeromq的书,网上的资料多是一些代码的实现 ...

  5. springboot整合rocketMQ记录 实现发送普通消息,延时消息

    一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例) 首先RocketMQ是阿里巴巴自研出来的,也已开源.其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂 ...

  6. JMS学习八(ActiveMQ消息持久化)

    JMS学习八(ActiveMQ消息持久化) ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,还有一种内存存储的方式,由于内存不属于持久化范畴,而且如果使用内存队列,可 ...

  7. SpringBoot集成ActiveMq消息队列实现即时和延迟处理

    原文链接:https://blog.csdn.net/My_harbor/article/details/81328727 一.安装ActiveMq 具体安装步骤:自己谷歌去 二.新建springbo ...

  8. 【转】ActiveMQ消息传送机制以及ACK机制详解

    2019独角兽企业重金招聘Python工程师标准>>> 本文转载自 http://shift-alt-ctrl.iteye.com/blog/2020182 AcitveMQ是作为一 ...

  9. ActiveMQ消息传送机制以及ACK机制详解

    2019独角兽企业重金招聘Python工程师标准>>> AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全 ...

最新文章

  1. MySQL 中 6 个常见的日志问题
  2. redis事务不具有回滚机制,那么它是如何进行事务控制的
  3. python大数据工程师 培训_大数据工程师学习之路
  4. java impala_Java实现impala操作kudu
  5. 【转】动态模型及其求解介绍–番外篇
  6. 做windows界面,用QT还是MFC?
  7. idea中不重启服务器更改代码(使用jrebel)
  8. 【老孙随笔】想学程序设计,先学人生设计!
  9. oracle归档日志百分比,Oracle归档日志处理
  10. (十六)51单片机——红外遥控
  11. linux系统nohob安装,Linux启动详解1
  12. tp-link与台式计算机连接教程,【详细图解】TP-Link TL-WDR6510路由器电脑设置教程...
  13. jsp+css实现图片自动轮换
  14. python aks_使用环回aks和terraform构建基于打字稿的游戏后端
  15. Android——Timer停不下来的解决方法
  16. Cartesi 举办的2023 黑客马拉松
  17. 坐标系统和投影变换基础知识及其在ArcGIS桌面产品中的应用(二)
  18. R语言和hadoop
  19. 任务调度-xxl-job
  20. 推荐两款支持在linux下运行ASP.NET网站的国产免费WEB服务器软件

热门文章

  1. 手机应用:非功能需求 Check List
  2. notepad++ java编码,输出中文字符时,编译出错
  3. 怎样实现企业管理系统的操作日志功能
  4. (九)javaScript的基本使用
  5. jQuery js 互转
  6. Jsp页面中使用fckeditor控件的两种方法
  7. 洛谷P3688/uoj#291. [ZJOI2017]树状数组
  8. 在C#中用COM操作CAD
  9. canvas转化图片并下载
  10. 安全预测 影响企业风险管理的三大趋势