上一篇文章对ActiveMQ有了初步认识,了解了其大致原理。接下来说说实战中ActiveMQ的应用。

幸运的金荷,公众号:梁同学CodingActiveMQ(一)

八、Broker

*上一篇讲到,启动mq时可以指定,将启动信息保存在指定目录的命令

./activemqstart >/myactiveMQ/run_activemq.log

当然也有类似的启动的时候加载指定配置文件的命令

./activemq start xbean:file/myactive/active-apche-5.15.13/conf/active02.conf

含义:Broker相当于一个MQ的实例,之前都是通过虚拟机来跑,现在给我们提供了通过代码的方式来启动我们的MQ实例。以代码的形式,将ActiveMQ嵌入到Java代码中。

1.依赖pom

   com.fasterxml.jackson.core   jackson-databind   2.9.5   org.apache.activemq   activemq-all   5.15.9   org.apache.xbean   xbean-spring   3.16

2. 在代码中运行Broker

//ActiveMQ也支持在jvm中通信基于嵌入式的brokerBrokerService brokerService = new BrokerService();brokerService.setUseJmx(true);brokerService.addConnector("tcp://localhost:61616");brokerService.start();

3.在控制台可以看到服务已经启动。修改队列生产者和消费者的uri连接地址

private static final String ACTIVEMQ_URL = "tcp://localhost:61616";

4.运行生产者和消费者

九、SpringBoot整合MQ

1.导入依赖

   org.springframework.boot   spring-boot-starter-activemq   2.1.5.RELEASE

2.新建-队列生产者的配置文件

server:  port: 7777spring:  activemq:    broker-url: tcp://192.168.91.130:61616  #自己linux服务器上的地址    user: admin    password: admin  jms:    pub-sub-domain: false   # false = Queue | true = Topic#自己定义队列名称myqueue: boot-active-queue

3.将队列生产者的配置文件做成配置文件来调用

@Component@EnableJmspublic class ConfigBean {    @Value("${myqueue}")    private String myqueue;    @Bean   //    public Queue queue(){        return new ActiveMQQueue(myqueue);    }}

4.建立一个消息的生产者

@Componentpublic class Queue_Produce {    //利用MQ给我们提供的Template来操作消息    @Autowired    private JmsMessagingTemplate jmsMessagingTemplate;    //获取上面配置的队列    @Autowired    private Queue queue;    public void produceMsg(){        jmsMessagingTemplate.convertAndSend(queue,"******"+ UUID.randomUUID().toString().substring(0,6));    }    //延迟5秒发送    @Scheduled(fixedDelay = 5000L)    public void produceMsgScheduled(){        jmsMessagingTemplate.convertAndSend(queue,"******Scheduled"+ UUID.randomUUID().toString().substring(0,6));        log.info("send Scheduled Message");    }}

5.新建-队列消费者的配置文件

server:  port: 8888spring:  activemq:    broker-url: tcp://192.168.91.130:61616  #自己linux服务器上的地址    user: admin    password: admin  jms:    pub-sub-domain: false   # false = Queue | true = Topic#自己定义队列名称myqueue: boot-active-queue

6.监听队列生产者发送的消息

@Componentpublic class Queue_Consumer {    @JmsListener(destination = "${myqueue}")    public void receive(TextMessage textMessage) throws Exception{        System.out.println("消费者接收到消息:"+textMessage.getText());    }}

7.启动消费者、生产者进行测试

8.可以看到,生产者每隔5秒自动发送一条消息到MQ,消费者启动着监听接收并打印出了消息。

9.主题,也和队列大同小异。只是注意如下

 pub-sub-domain: true   # false = Queue | true = Topic
② new的是Topic
@Beanpublic Topic topic(){return new ActiveMQTopic(topicName);}
③配置的是主题,不是之前的队列
@Autowiredprivate Topic topic;

④先启动主题的消费者。更改端口号在运行主程序。再修改端口号运行主程序。这样便有了两个消费者,最后在启动生产者。模拟了“发布-订阅”。

十、ActiveMQ的传输协议

之前演示的例子中,默认都采用的是tcp://协议进行的MQ连接。除此之外,还有很多协议也可以用来进行传输。例如nio://协议

1.打开mq的配置文件。

①可以看到我们之前用的是openwrie协议描述URI头采用的tcp://这是MQ的默认消息协议

②除了openwrie协议之外,还有其他例如amqp\stomp\mqtt\ws协议。

2. NIO协议

修改conf配置文件

添加name=”nio”uri=”nio//0.0.0.0:61618?trace=true”/>

打开mq控制台Connections可以看到nio的身影

将之前代码中的tcp换成nio

3. auto+nio协议

auto+nio的协议:代表了将上述的amqp\stomp\mqtt\ws协议都集成到了一起外加nio协议。

name=”auto+nio”uri=”auto+nio//0.0.0.0:61618?trace=true”/>

十一、ActiveMQ的消息存储和持久化

1.持久化机制有哪些?

AMQ、KehaDBLevelDBJDBC、JDBC Message Store With ActiveMQ Journal(日志)

2.各个机制的特点?

①AMQ:采用文件存储~ 类似于Redis的dump.rdb(时间间隔内写入磁盘)

②KehaDB:

  • 默认消息存储机制

  • 类似于Redis的appendOnly.aof(日志追加)

  • 默认地址为/myactiveMQ/apache-activemq-5.15.13/data/Kahadb

  • 消息存储使用一个日志事务文件db-.log和仅仅使用一个索引文件db.data来存储它所有的地址。

  • 存储原理:4类文件1把锁

db-.log:前身AMQ,默认大小32MB,超过了它,自动db-.logdb.data:BTree索引db.redo:强制退出后,恢复BTree索引lock:文件锁db.free:空闲页(字典)

③LevelDB:和KehaDB非常相似,也是基于文件的本地数据库存储形式,使用LevelDB索引

④JDBC持久化机制-步骤

  • 采用的MQ+Mysql组合

  • 添加mysql数据库驱动jar包到lib文件夹

  • jdbcPersistenceAdapter配置

注意:createTablesOnStartup默认true,一般在第一次启动后将其改为false

  • 更改activemq.conf数据库连接池配置

mysql-ds引用持久化数据库的bean名称存放位置

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">     <property name="driverClassName" value="com.mysql.jdbc.Driver"/>     <property name="url" value="jdbc:mysql:// 192.168.0.110:3306/activemq?relaxAutoCommit=true"/>     <property name="username" value="root"/>     <property name="password" value=""/>     <property name="poolPreparedStatements" value="true"/> bean> 
  • 建仓SQL和建表  (*activemq启动日志文件 /data/activemq.log)

上述步骤若都操作正确,则会在windows数据库(配置文件中填写的数据库名-实现手动建立好)新建3张表。

注意:我在试验过程中,发现不管怎么更改配置文件,mq就是启动不起来,查看日志文件发现经常报下面的异常。错误解决办法。https://blog.csdn.net/lsxy117/article/details/47207567utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase

activemq_acks

activemq_lock

activemq_msgs

  • 代码运行验证、数据库情况

开启了持久模式(默认开启)DeliveryMode.Persisten

非持久模式DeliveryMode.NON_Persisten

点对点类型中: activemq_msgs

消息保存在broker或者数据库msg中

消息保存在内存中

一对多类型中:

activemq_acks

connection.setClientID("z4");

TopicSubscriber topicSubscriber =  session.createDurableSubscriber(topic, "备注");//创造持久化订阅者

connection.start();

会永久存在在acks表中,除非控制台将订阅者删除掉

  • 小总结、开发有坑

1.数据库jar包:mysql-jdbc驱动包放在/lib包下

2.在jdbcPersistenceAdapter标签中,createTablesOnstartup第一次为true后,下次启动将其改为false

3.下划线有坑:BeanFactory not initialized or alreadyclosed.

机器操作系统机器名中有”_”符号。改名重启即可。

⑤ JDBC Message Store With ActiveMQJournal(日志)

使用高速缓存写入技术。

生产者—快速生产了大量消息—到journal文件。

消费者—消费速度也很快。

journal文件挡在mysql之前

消息被消费者消费只剩20%再存到Mysql。大大减少了消息和Mysql的压力。

此时,再启动队列生产者,数据表msgs中不会再插入。消费者也照常运行。

Ps:消息都抵挡在了journal里了,还没到mysql。

3.ActiveMQ持久化机制小总结

JMS的可靠性?

持久性、事务、签收、可持久化:MQ所在服务器down了消息不会丢失的机制

演化过程?

       AMQ(最初)---High performance journal(同步推出Mysql解决方案)---KehaDB(默认)---LevelDB(基于文件)---Zookeeper+LevelDB(集群化方案)

未完待续。。。

activemq原理_ActiveMQ(二)相关推荐

  1. 分布式消息通信 ActiveMQ 原理 分析二

    本章重点: 1. unconsumedMessage 源码分析 2. 消费端的 PrefetchSize 3. 消息的确认过程 4. 消息重发机制 5. ActiveMQ 多节点高性能方案 消息消费流 ...

  2. HNSW算法原理(二)之删除结点

    原文:https://blog.csdn.net/CHIERYU/article/details/86647014 HNSW算法原理(二)之删除结点 本篇文章继之前的一篇文章 HNSW算法原理(一)  ...

  3. 一文读懂TOF深度相机技术原理--TI-Tintin-OPT8241二次开发和应用系列--Theory Level

    一文读懂TOF深度相机技术原理--TI-Tintin-OPT8241二次开发和应用系列--Theory Level 转载请附上出处,本文链接:https://www.cnblogs.com/pans0 ...

  4. 实现同步请求_图解 Promise 实现原理(二)—— Promise 链式调用

    摘要 很多同学在学习 Promise 时,知其然却不知其所以然,对其中的用法理解不了.本系列文章由浅入深逐步实现 Promise,并结合流程图.实例以及动画进行演示,达到深刻理解 Promise 用法 ...

  5. 【es】es 分布式一致性原理剖析(二)-Meta篇

    1.概述 转载:Elasticsearch分布式一致性原理剖析(二)-Meta篇 前言 "Elasticsearch分布式一致性原理剖析"系列将会对Elasticsearch的分布 ...

  6. 编译原理实验二:赋值语句的语法分析程序设计

    编译原理实验二:赋值语句的语法分析程序设计 1.1实验内容 目的: 在前面实验的基础上,通过设计.编制.调试一个典型的赋值语句的语法分析程序,实现对词法分析程序所提供的单词序列进行语法检查,进一步掌握 ...

  7. 微型计算机原理8255并行接口实验,微机原理实验二 8255A并行接口应用.pdf

    微机原理实验二 8255A并行接口应用 实验二 8255A 并行接口应用 一.实验目的 1.掌握8255A 的功能及方式0.1 的实现 2 .熟悉8255A 与CPU 的接口,以及传输数据的工作原理及 ...

  8. 导航电子地图数据中POI搜索技术原理之二

    导航电子地图数据中POI搜索技术原理之二 支持任意检索 <浅谈导航数据中POI搜索技术原理>一文介绍了导航电子地图中POI兴趣点数据进行关键字检索的基本原理. 其中,数据的组织方式如下: ...

  9. 编译原理实验二:Bison

    编译原理实验二:Bison 实验要求 1.了解Bision基础知识,如何将文法产生式转换为Bison语句 2.阅读/src/common/SyntaxTree.c,对应头文件 /include/Syn ...

最新文章

  1. [Prism]Composite Application Guidance for WPF(8)——事件
  2. 数组中的逆序对,为什么要在第一个小于等于的时候计数?
  3. 2017-6-3 jQuery 事件 DOM操作
  4. C#3.0 为我们带来什么(5) —— 匿名类型
  5. Oracle增加修改删除字段/主键
  6. 细胞计算机生命游戏,【图片】【20170108 其它內容】【转】生命游戏【三体吧】_百度贴吧...
  7. AfxParseURL
  8. JavaScript 中的函数介绍
  9. java解析XML Node与Element的区别
  10. 栈帧与操作数栈剖析及符号引用与直接引用的转换
  11. 大数据发展火爆,云计算平台主打安全至上
  12. asp.net弹出对话框
  13. new mediacontroller(this) this报错_面试官问你JS的this指向,你能跟他聊多少?
  14. N!阶层的二进制表示中最低位1的位置
  15. 华为MA5606T升级固件
  16. 中药知多少!!!!!
  17. FEC【筷云早报】2020年3月16日星期一
  18. 十个高质量自学网站,让你的技术突飞猛进
  19. 看完《指环王》说几句
  20. 青岛方言发音对照表(内附英文释义)

热门文章

  1. 古人云:脚踏实地没有办不成的事!入门java从这里一步一步学起!
  2. 2022-2028全球与中国室内垂直农场市场深度分析及十四五规划咨询建议报告
  3. rss技术为blog带来什么?
  4. 宏函数和自定义函数的区别
  5. ​让饭圈女孩杀入币圈
  6. mysql 的几种缓存_数据库缓存的几种方式
  7. 软件加入使用时间_当我设定软件使用时间之后。。。
  8. C语言中如何将二维数组作为函数的参数传递
  9. 二维数组作为函数参数传递的三种方式
  10. android AutoCompleteTextView 实现手机号格式化,附带清空历史的操作