概述

  • 为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。
  • ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
  • 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试发送。
  • 消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。
  • ActiveMQ宕机了,消息不会丢失的机制。

有效的消息存储方式

  • 基于文件的存储方式,是以前的默认消息存储,现在不用了
  • AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。
  • 消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本

KahaDB消息存储(目前默认)

  • 基于日志文件,从ActiveMQ5.4开始默认的持久化插件

  • KahaDB的存储原理

    KahaDB在消息保存的目录中有4类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比,这就非常简洁了。

文件类型 说明
db-number.log KahaDB存储消息到预定大小的数据纪录文件中,文件名为db-number.log。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如db-1.log,db-2.log······。当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。
db.data 该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-number。log里面存储消息。
db.free 当问当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的ID
db.redo 用来进行消息恢复,如果KahaDB消息存储再强制退出后启动,用于恢复BTree索引。
lock 文件锁,表示当前kahadb独写权限的broker。

JDBC消息存储

消息基于JDBC存储的

LevelDB消息存储

  • 这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。

  • 但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引

  • 默认配置如下:

    <persistenceAdapter><levelDB directory="activemq-data"/>
    </persistenceAdapter>
    

JDBC Message Store with ActiveMQ Journal

JDBC存储消息详解

架构图

实现过程

MQ+MySQL实现消息持久化存储

(1)添加mysql数据库的驱动包到lib文件夹

wget   -P    保存目录   https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.17/mysql-connector-java-8.0.17.jar

(2)jdbcPersistenceAdapter配置

修改activemq.xml配置文件

  • 修改前的KahaDB
<persistenceAdapter><kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
  • 修改后的jdbcPerstenceAdapter
<persistenceAdapter><jdbcPersistenceAdapter dataSource="#mysql-ds" />
</persistenceAdapter>
  • dataSource是指定将要引用的持久化数据库的bean名称
  • createTableOnStartup是否在启动的时候创建数据库表,默认是true,这样每次启动都会去创建表了,一般是第一次启动的时候设置为true,然后再去改成false。

(3)数据库连接池配置

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/> <property name="url" value="jdbc:mysql://192.168.67.140:3306/activemq2020?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="123456"/> <property name="poolPreparedStatements" value="true"/> </bean>

(4)建库SQL和创表说明

  • 建一个名为activemq2020的数据句酷

  • 三张表说明

    • ACTIVEMQ_MSGS

      字段 说明
      ID 自增的数据库主键
      CONTAINER 消息的Destination
      MSGID_PROD 消息发送者的主键
      MSG_SEQ 是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
      EXPIRATION 消息的过期时间,存储的是从1970-01-01到现在的毫秒数
      MSG 消息本体的Java序列化对象的二进制数据
      PRIORITY 优先级,从0~9,数值越大优先级越高

      消息表,缺省表名ACTIVEMQ_MSGS,Queue和Topic都存在里面,结构如下

    • ACTIVEMQ_ACKS

    • ACTIVEMQ_LOCK

      表ACTIVEMQ_LOCK在集群环境下才有用,只有一个Broker可以获取消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker

如果新建数据库ok,上述配置ok,代码运行ok,3张表会自动生成

如果表没生成,可能需要自己创建

-- auto-generated definition
create table ACTIVEMQ_ACKS
(CONTAINER     varchar(250)     not null comment '消息的Destination',SUB_DEST      varchar(250)     null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',CLIENT_ID     varchar(250)     not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',SUB_NAME      varchar(250)     not null comment '订阅者名称',SELECTOR      varchar(250)     null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',LAST_ACKED_ID bigint           null comment '记录消费过消息的ID',PRIORITY      bigint default 5 not null comment '优先级,默认5',XID           varchar(250)     null,primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)comment '用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存';create index ACTIVEMQ_ACKS_XIDXon ACTIVEMQ_ACKS (XID);-- auto-generated definition
create table ACTIVEMQ_LOCK
(ID          bigint       not nullprimary key,TIME        bigint       null,BROKER_NAME varchar(250) null
);-- auto-generated definition
create table ACTIVEMQ_MSGS
(ID         bigint       not nullprimary key,CONTAINER  varchar(250) not null,MSGID_PROD varchar(250) null,MSGID_SEQ  bigint       null,EXPIRATION bigint       null,MSG        blob         null,PRIORITY   bigint       null,XID        varchar(250) null
);create index ACTIVEMQ_MSGS_CIDXon ACTIVEMQ_MSGS (CONTAINER);create index ACTIVEMQ_MSGS_EIDXon ACTIVEMQ_MSGS (EXPIRATION);create index ACTIVEMQ_MSGS_MIDXon ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);create index ACTIVEMQ_MSGS_PIDXon ACTIVEMQ_MSGS (PRIORITY);create index ACTIVEMQ_MSGS_XIDXon ACTIVEMQ_MSGS (XID);

(5)代码运行验证

一定要开启持久化

messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
  • 队列

    • 生产者

      package com.demo.queue;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Producer {private static final String ACTIVEMQ_URL = "nio://192.168.67.130:61616";private static final String ACTIVEMQ_QUEUE_NAME = "Queue-JdbcPersistence";public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);Connection connection = activeMQConnectionFactory.createConnection();Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);MessageProducer messageProducer = session.createProducer(queue);messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();for (int i = 0; i < 3; i++) {TextMessage textMessage = session.createTextMessage("Queue-JdbcPersistence测试消息" + i);messageProducer.send(textMessage);}session.commit();System.out.println("消息发送完成");messageProducer.close();session.close();connection.close();}
      }
      
    • 消费者

      package com.demo.queue;import lombok.SneakyThrows;
      import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
      import java.io.IOException;public class Consumer {private static final String ACTIVEMQ_URL = "nio://192.168.67.130:61616";private static final String ACTIVEMQ_QUEUE_NAME = "Queue-JdbcPersistence";public static void main(String[] args) throws JMSException, IOException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);Connection connection = activeMQConnectionFactory.createConnection();Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);MessageConsumer messageConsumer = session.createConsumer(queue);connection.start();messageConsumer.setMessageListener(new MessageListener() {@SneakyThrows@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;session.commit();System.out.println("消费者收到消息" + textMessage.getText());}}});System.in.read();}
      }
      
  • 主题

    • 生产者

      package com.demo.topic;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Producer {private static final String ACTIVEMQ_URL = "nio://192.168.67.130:61616";private static final String ACTIVEMQ_TOPIC_NAME = "Topic-JdbcPersistence";public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);Connection connection = activeMQConnectionFactory.createConnection();connection.setClientID("我是生产者张三");Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic(ACTIVEMQ_TOPIC_NAME);MessageProducer messageProducer = session.createProducer(topic);messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();for (int i = 0; i < 3; i++) {TextMessage textMessage = session.createTextMessage("Topic-JdbcPersistence测试消息" + i);messageProducer.send(textMessage);}session.commit();System.out.println("主题发送到MQ完成");messageProducer.close();session.close();connection.close();}
      }
      
    • 消费者

      package com.demo.topic;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
      import java.io.IOException;public class Consumer1 {private static final String ACTIVEMQ_URL = "nio://192.168.67.130:61616";private static final String ACTIVEMQ_TOPIC_NAME = "Topic-JdbcPersistence";public static void main(String[] args) throws JMSException, IOException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);Connection connection = activeMQConnectionFactory.createConnection();connection.setClientID("我是消费者李四");Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic(ACTIVEMQ_TOPIC_NAME);TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "我是消费者李四我要订阅这个消息");connection.start();topicSubscriber.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;try {System.out.println("消费者李四收到的消息: " + textMessage.getText());session.commit();} catch (JMSException e) {e.printStackTrace();}}}});System.in.read();}
      }
      

(6)数据库情况

  • 一旦运行生产code

    • 点到点

      • 在点对点类型中,当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中

      • 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。

      • 而且点对点类型中消息一旦被Consumer消费,就从数据中删除

      • 消费前的消息,会被存放到数据库

      • 上面的消息被消费后被MQ自动删除

    • 发布/订阅

      • 设置了持久订阅数据库里面会保存订阅者的信息

  • MySQL

    • queue

    • topic

      • ACTIVEMQ_ACKS表中的LAST_ACKED_ID记录了CLIENT_ID最后签收的一条消息

    • 而LAST_ACKED_ID和ACTIVEMQ_MSGS的ID字段是外键关联关系,这样就可以实现,Topic的消息保存到ACTIVEMQ_MSGS表内,还能根据ACTIVEMQ_ACKS表中的持久订阅者查到该订阅者上次收到的最后一条消息是什么

    • 值得注意的是,Topic内的消息是不会被删除的,而Queue的消息在被删除后,会在数据库中被删除,如果需要保存Queue,应该使用其他方案解决

注意事项

在配置关系型数据库作为ActiveMQ的持久化存储方案时,有坑

  • 数据库jar包

  • 注意把对应版本的数据库jar或者你自己使用的非自带的数据库连接池jar包

  • createTablesOnStartup属性

  • 默认为true,每次启动activemq都会自动创建表,在第一次启动后,应改为false,避免不必要的损失。

  • java.lang.IllegalStateException: LifecycleProcessor not initialized确认计算机主机名名称没有下划线

  • Table ‘activemq.ACTIVEMQ_ACKS’ doesn’t exist.

    • 在数据库中只创建了两张表(正常需要创建三张表:ACTIVEMQ_ACKS、ACTIVEMQ_LOCK、ACTIVEMQ_MSGS)

    • 查看ActiveMQ官网发现这样的解释:key最多只能1000个字符,utf8等中文编码会占用过多的字符,所以要采用latin1 或者ASCII编码

    • 重新建一个库,并在建库的时候指定默认编码方式

      create databases activemq2020 default character set latin1;
      

小结

  • 如果是queue

    • 在没有消费者消费的情况下会将消息保存到activemq_msgs表中,只要有任意一个消费者消费了,就会删除消费过的消息
  • 如果是topic,
    • 一般是先启动消费订阅者然后再生产的情况下会将持久订阅者永久保存到qctivemq_acks,而消息则永久保存在activemq_msgs
    • 在acks表中的订阅者有一个last_ack_id对应了activemq_msgs中的id字段,这样就知道订阅者最后收到的消息是哪一条。

JDBC Message store with ActiveMQ Journal(优化版的JDBC存储)

概述

  • *JDBC Message Store with ActiveMQ Journal====优化版的JDBC存储*
  • 这种方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提高了性能。
  • JDBC 配合其自带的 high performance journal;根据官方说法,它内置的高性能journal的工作类似于在缓存层工作,消息会优先写入到journal,后台的定时任务会每隔一段时间间隔去。

  • 这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库

  • ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。

  • 当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。

    举个例子:

    生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。

配置过程

  • 修改配置前

    <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" />
    </persistenceAdapter>
    
  • 修改配置后

    <persistenceFactory>        <journalPersistenceAdapterFactory journalLogFiles="5" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds" dataDirectory="../activemq-data" />
    </persistenceFactory>
    

以前是实时写入mysql,在使用了journal后,数据会被journal处理,如果在一定时间内journal处理(消费)完了,就不写入mysql,如果没消费完,就写入mysql,起到一个缓存的作用

JDBC Store和JDBC Message Store with ActiveMQ Journal的区别

  1. JDBC with journal的性能优于jdbc
  2. JDBC用于master/slave模式的数据库分享
  3. JDBC with journal不能用于master/slave模式
  4. 一般情况下,推荐使用jdbc with journal

总结

  • 持久化消息主要指的是:MQ所在服务器宕机了消息不会丢试的机制。

  • 持久化机制演变的过程:

    • 从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。
  • ActiveMQ消息持久化机制有:

    类型 说明
    AMQ 基于日志文件
    KahaDB 基于日志文件,从ActiveMQ5.4开始默认使用
    JDBC 基于第三方数据库
    Replicated LevelDB Store 从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案

ActiveMQ的消息存储和持久化相关推荐

  1. ActiveMQ 原理分析—消息持久化篇

    消息持久化策略 背景 当消息发送者(provider)发送消息后消费者(consumer)没启动.故障, 或者消息中心在发送者发送消息后宕机了.ActiveMQ是如何保证消息不丢失,消费者能够正常的消 ...

  2. ActiveMQ的消息存储(八)

    1.队列存储 Queues采取先进先出模式,同一时间,消息只会发送给某一个消费者,只有当该消息被消费并告知已收到时,它才能在代理的存储中被删除. 而对于持久性Topic来说,每一个消费者都会获取消息的 ...

  3. 消息中间件ActiveMQ 5:可持久化方式AMQ和KahaDB

    文章目录 ActiveMQ 的持久化方式 一.需要进行消息持久化的原因 二.持久化方式 1.AMQ message Store(了解) 2.KahaDB消息存储(默认) ActiveMQ 的持久化方式 ...

  4. MQ之ActiveMQ

    Activemq 前言 1.入门概述 1.1 MQ 的产品种类和对比 1.2 MQ 的产生背景 1.2.1 系统之间接口耦合比较严重 ----解耦 1.2.2 面对大流量并发时,容易被冲垮 ----- ...

  5. ActiveMQ知识概括

    ActiveMQ知识概括 ActiveMQ简介 Java实现ActiveMQ JMS规范与落地 ActiveMQ的broker Spring,SpringBoot整合ActiveMQ ActiveMQ ...

  6. activemq原理_ActiveMQ(二)

    上一篇文章对ActiveMQ有了初步认识,了解了其大致原理.接下来说说实战中ActiveMQ的应用. 幸运的金荷,公众号:梁同学CodingActiveMQ(一) 八.Broker *上一篇讲到,启动 ...

  7. ActiveMQ 持久化讯息数据库信息

    www.MyException.Cn   发布于:2012-11-10 10:48:50   浏览:0次 ActiveMQ 持久化消息数据库信息 最近有网友问我,ActiveMQ持久化的中表结构是什么 ...

  8. JMS之——ActiveMQ消息持久化

    转载请注明出处:http://blog.csdn.net/l1028386804/article/details/68997105 之前的几篇博文中,我们实现的ActiveMQ消息未实现消息的持久化, ...

  9. activemq mysql 配置详解_activeMQ数据库配置

    ActiveMQ很好的支持了消息的持久性(Persistence).消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化, 即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后 ...

最新文章

  1. MQTT在Windows下搭建MQTT服务器
  2. 二分类神经网络的特征光谱---2-3至2-9
  3. 在python中查看关键字、需要执行_python关键字以及含义,用法
  4. 千兆网综合布线系统的线缆选型
  5. [置顶] Java Socket实战之一 单线程通信
  6. 如何使用Redis做MySQL的缓存
  7. 论文浅尝 | GNN with Generated Parameters for Relation Extraction
  8. Linux SVN迁移备份的三种方法
  9. Navicat Premium 12快捷键
  10. 【分享一些自己收集的API接口---欢迎点赞收藏】
  11. DIRECTSHOW中的视频捕捉
  12. coq程序编写好用的IDE推荐
  13. CVPR21-无监督异常检测《CutPaste:Self-Supervised Learning for Anomaly Detection and Localization》
  14. java的try后面跟括号
  15. 每日codingame小游戏练习[2021.3.29](python3入门学习之rstrip方法)
  16. MATLAB - contour函数
  17. 微信小程序实现一个简单的倒计时效果
  18. css淡入淡出_CSS淡入淡出
  19. 东欧黑客入侵港股造市图利 半年涉款5300万
  20. 2015年 行人检测总结4

热门文章

  1. 我是如何自学 Python 的
  2. string 相等 java_java中String相等问题
  3. hooks 使用dva_Taro3 中使用dva
  4. linux oraclerman自动备份,Linux平台下的Oracle自动备份案例(使用RMAN)
  5. mask属性是css3的吗_CSS mask-image属性详细介绍(小结)
  6. docker用gpu的参数_从零开始入门 K8s | GPU 管理和 Device Plugin 工作机制
  7. [蓝桥杯2016初赛]卡片换位 bfs+set
  8. 树形dp ---- 树形换根dp F - The Maximum Subtree
  9. jsp教学网站百度文库_基于JSP的精品课程网站设计与实现
  10. Liunx下MySQL常用命令