ActiveMQ集群
1.ActiveMQ集群介绍
1.为什么要集群?
实现高可用,以排除单点故障引起的服务中断
实现负载均衡,以提升效率为更多客户提供服务
2.集群方式
客户端集群:让多个消费者消费同一个队列
Broker Cluster:多个Broker之间同步消息(做不了高可用,可以实现负载均衡)
Master-Slave:高可用(做不了负载均衡)
3.ActiveMq失效转移
允许当其中一台消息服务器宕机时,客户端在传输层上重新连接其他消息服务器。
语法:failover:(uri1,...uriN)?transportOptions
transportOptions参数说明:randomize:默认为true,表示在URI列表中选择URI连接时是否采用随机策略
initialReconnectDelay默认为10,默认10毫秒,表示第一次尝试重连之间等待的时间。
maxReconnectDelay:默认30000,单位毫秒,最长重连时间的间隔
4.Master/Slave集群配置
Share nothing storage master/slave(已经过时,5.8之后移除)
shared storage master/slave 共享存储(实际上是共享同一文件夹,只不过采用排他锁,所以只有一个master节点可以访问,当此服务宕机,另一台slave可以快速强占排他锁,所以不会造成数据丢失,使用同一文件夹下的东西。如果多态服务器的话需要搭建文件共享服务器)
Replicated LevelDB Store 基于复制的LevelDB Store
1.共享存储原理:(获取排他锁才可以提供消息服务)--简单方式
2.Replicated LevelDB Store 基于复制的LevelDB Store原理(基于zookeper)
3.两种方式对比:
高可用 | 负载均衡 | |
Master/Slave | 是 | 否 |
Broker Cluster | 否 | 是 |
4.三台机器的完美集群方案:(实现高可用和负载均衡)
2.ActiveMQ集群配置
1.方案介绍
方案如下: B与C采用master-slave共享文件夹存储(任一时刻只有一个可以占有排他锁,也就是只有一个可以提供服务),当A宕机之后B会获取资源锁提供服务---实现高可用
A与B、A与C都采用Broker 集群,可以同时提供服务,不管B与C谁获取锁,A都可以提供服务(A只能提供服务消费消息,不生成消息)---实现负载均衡
配置如下:(同一个电脑不同端口模拟集群)
2.配置文件 (修改的配置文件都是在apache-activemq-5.15.6\conf文件夹下)
(1)activemq-a下面的配置:---A服务器
activemq.xml:(注释掉其他协议,服务端口使用61616端口;增加静态网络连接器,同时连接B与C)
jetty.xml: 采用默认的8161端口
(2)activemq-b下面的配置:---B服务器
activemq.xml:(增加与A的静态连接器,修改服务端口采用61617,修改共享文件夹的地址)
jitty.xml:修改端口采用8162
(3)activemq-c下面的配置:---C服务器
activemq.xml:(增加与A的静态连接器,修改服务端口采用61618,修改共享文件夹的地址)
jitty.xml:修改端口采用8163
3.依次启动ActiveMq进行测试
启动顺序是:A->B->C,下面通过端口信息验证集群:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496TCP [::]:8161 [::]:0 LISTENING 2423496liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162TCP 0.0.0.0:8162 0.0.0.0:0 LISTENING 2423756TCP [::]:8162 [::]:0 LISTENING 2423756 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496TCP 127.0.0.1:55338 127.0.0.1:61616 ESTABLISHED 2423756TCP 127.0.0.1:61616 127.0.0.1:55338 ESTABLISHED 2423496TCP [::]:61616 [::]:0 LISTENING 2423496liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617TCP 0.0.0.0:61617 0.0.0.0:0 LISTENING 2423756TCP 127.0.0.1:55345 127.0.0.1:61617 ESTABLISHED 2423496TCP 127.0.0.1:61617 127.0.0.1:55345 ESTABLISHED 2423756TCP [::]:61617 [::]:0 LISTENING 2423756 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618
由于先启动的B,B占有排斥锁,所以B(8162-61617)处于监听状态,而C与B采用共享文件排他锁集群,所以C处于阻塞状态,也就是没有监听端口,被阻塞。
停掉B服务器,模拟B服务器宕机的情况,再次查看端口:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496TCP [::]:8161 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163TCP 0.0.0.0:8163 0.0.0.0:0 LISTENING 2423848TCP [::]:8163 [::]:0 LISTENING 2423848 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496TCP 127.0.0.1:56704 127.0.0.1:61616 ESTABLISHED 2423848TCP 127.0.0.1:61616 127.0.0.1:56704 ESTABLISHED 2423496TCP [::]:61616 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618TCP 0.0.0.0:61618 0.0.0.0:0 LISTENING 2423848TCP 127.0.0.1:56791 127.0.0.1:61618 ESTABLISHED 2423496TCP 127.0.0.1:61618 127.0.0.1:56791 ESTABLISHED 2423848TCP [::]:61618 [::]:0 LISTENING 2423848 liqiang@root MINGW64 ~/Desktop
由于B宕机,所以C会强占排他锁,也就是会监听端口,所以可以看到8163与61618端口的监听状态。
现在模拟C也宕机,将C服务器也停掉。再次查看端口信息:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496TCP [::]:8161 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496TCP [::]:61616 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617TCP 127.0.0.1:57242 127.0.0.1:61617 SYN_SENT 2423496liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618liqiang@root MINGW64 ~/Desktop
此时A服务器仍然在监听端口。
上面证明集群搭建成功,下面重新开启A->B->C服务器开始程序验证。
4.程序验证集群
服务器开启顺序性:A->B->C(此时B占有排他锁,B监听端口,A一直监听)
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2428232TCP 127.0.0.1:60520 127.0.0.1:61616 ESTABLISHED 2427192TCP 127.0.0.1:61616 127.0.0.1:60520 ESTABLISHED 2428232TCP [::]:61616 [::]:0 LISTENING 2428232 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617TCP 0.0.0.0:61617 0.0.0.0:0 LISTENING 2427192TCP 127.0.0.1:60513 127.0.0.1:61617 ESTABLISHED 2428232TCP 127.0.0.1:61617 127.0.0.1:60513 ESTABLISHED 2427192TCP [::]:61617 [::]:0 LISTENING 2427192 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2428232TCP [::]:8161 [::]:0 LISTENING 2428232 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162TCP 0.0.0.0:8162 0.0.0.0:0 LISTENING 2427192TCP [::]:8162 [::]:0 LISTENING 2427192 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163
代码测试:
生产者: url加了失效策略,而且采用随机选取,生产消息的地址只有B与C服务器地址
package cn.qlq.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 生产消息* * @author QiaoLiQiang* @time 2018年9月18日下午11:04:41*/ public class MsgProducer {private static final String url = "failover:(tcp://localhost:61617,tcp://localhost:61618)?randomize=true";private static final String queueName = "myQueue";public static void main(String[] args) throws JMSException {// 1创建ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);// 2.由connectionFactory创建connectionConnection connection = connectionFactory.createConnection();// 3.启动connection connection.start();// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5.创建Destination(Queue继承Queue)Queue destination = session.createQueue(queueName);// 6.创建生产者producerMessageProducer producer = session.createProducer(destination);for (int i = 0; i < 100; i++) {// 7.创建Message,有好多类型,这里用最简单的TextMessageTextMessage tms = session.createTextMessage("textMessage:" + i);// 8.生产者发送消息 producer.send(tms);System.out.println("send:" + tms.getText());}// 9.关闭connection connection.close();}}
消费者:url加了失效策略,而且采用随机选取,生产消息的地址有ABC服务器
package cn.qlq.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 消费消息* * @author QiaoLiQiang* @time 2018年9月18日下午11:26:41*/ public class MsgConsumer {private static final String url = "failover:(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618)?randomize=true";private static final String queueName = "myQueue";public static void main(String[] args) throws JMSException {// 1创建ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);// 2.由connectionFactory创建connectionConnection connection = connectionFactory.createConnection();// 3.启动connection connection.start();// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5.创建Destination(Queue继承Queue)Queue destination = session.createQueue(queueName);// 6.创建消费者consumerMessageConsumer consumer = session.createConsumer(destination);// 7.给消费者绑定监听器(消息的监听是一个异步的过程,不可以关闭连接,绑定监听器线程是一直开启的,处于阻塞状态,所以可以在程序退出关闭)consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {// 7.1由于消费者接受的是TextMessage,所以强转一下TextMessage tms = (TextMessage) message;try {System.out.println("接收消息:" + tms.getText());} catch (JMSException e) {e.printStackTrace();}}});}}
启动生产者发布100条消息:
管理界面查看:(消息被发布在B服务器,B占有排他锁,C处于阻塞)
A服务不生产消息所有A不会消费消息:(通过A查看网络连接器)
关掉B服务器之后查看C服务器:(验证B与C共享同一文件夹,且强占排他锁)
查看共享文件夹:
此时A与C处于监听状态,启动消费者:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2431676TCP [::]:8161 [::]:0 LISTENING 2431676 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163TCP 0.0.0.0:8163 0.0.0.0:0 LISTENING 2432188TCP [::]:8163 [::]:0 LISTENING 2432188TCP [::1]:8163 [::1]:63600 TIME_WAIT 0TCP [::1]:8163 [::1]:63648 TIME_WAIT 0TCP [::1]:8163 [::1]:63649 TIME_WAIT 0TCP [::1]:8163 [::1]:63650 TIME_WAIT 0TCP [::1]:8163 [::1]:63651 TIME_WAIT 0TCP [::1]:8163 [::1]:63652 TIME_WAIT 0
查看控制台如下:连接到A服务器消费信息
总结:上面的配置方案B与C是为了实现高可用,也就是一台宕机之后另一台马上强占排他锁提供服务(需要共享文件夹实现共用同一文件夹下的资源与锁),B与C可以生产消息,也可以提供消费消息,但是同一时刻只有一个提供服务。
提供A是为了与B、C实现负载均衡,A不生产消息,但是可以消费消息,替B或者C分担压力。
转载于:https://www.cnblogs.com/qlqwjy/p/9728425.html
ActiveMQ集群相关推荐
- ActiveMQ — 集群 — 安装与配置
2019独角兽企业重金招聘Python工程师标准>>> 1. 服务器配置 推荐集群的数量为基数,并且在这些机器上安装JDK,配置环境变量. 2. 集群方式 常用的集群方式有三种: 基 ...
- 使用jmeter对ActiveMQ集群性能方案进行评估--转载
原文地址:http://www.51testing.com/html/78/23978-143163.html 1.测试概要 1.1 关于 这篇文档中涉及的基于JMS的消息系统能为应用程序提供可靠的, ...
- 分布式ActiveMQ集群--转载
原文地址:http://shensy.iteye.com/blog/1752529 回顾总结前一段时间学习的ActiveMQ分布式集群相关的知识,分享出来希望对看到的人有所帮助. 一.分布式Activ ...
- ActiveMQ 集群部署
ActiveMQ 集群部署 本章演示 ActiveMQ 集群部署,默认您已经安装了 zookeeper 集群,并在各服务器上成功安装了 ActiveMQ 单节点实例如果您的环境还不满足条件请参考前面的 ...
- ActiveMQ集群架构与原理解析
初识JMIS与其专业术语 小伙伴们大家好,现在我们和大家一起了解一下古老而又神秘的消息中间件"ActiveMQ".首先,说起ActiveMQ,就必须先聊聊JMS (Java Mes ...
- centos activemq 集群配置 Networks of Brokers
1.安装JDK运行环境 #cd /opt#wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=a ...
- linux activemq修改端口号,linux下 activemq集群配置
1.简述:回想老王打电话讲故事案例. 2.优势:解耦,异步,横向扩展,顺序保障,安全可靠... 3.JMS(java message service),是java平台中关于面向消息中间件的API,用于 ...
- activemq mysql集群配置_ActiveMQ专题--集群,高可用方案
ActiveMQ集群部署方式 Master-Slave部署方式 Broker-Cluster部署方式 Master-Slave与Broker-Cluster相结合的部署方式 Shared databa ...
- ActiveMQ的几种集群配置
ActiveMQ是一款功能强大的消息服务器,它支持许多种开发语言,例如Java, C, C++, C#等等.企业级消息服务器无论对服务器稳定性还是速度,要求都很高,而ActiveMQ的分布式集群则能很 ...
最新文章
- [skill] vim 操作多个window
- TensorFlow快餐教程:程序员快速入门深度学习五步法
- MySQL复制经常使用拓扑结构具体解释
- android中自适应布局教程,Android自适应布局设计技巧
- 大型互联网公司必考java面试题与面试技巧
- 敏捷转型历程 - Sprint3 回顾会
- php xml写入数据库中,PHP读取xml并写入数据库示例
- [翻译] C# 8.0 新特性
- 36产生用户恶情绪和报复情绪的原因
- AI医疗 | 新开源计算机视觉技术用于新生儿胎龄估计
- 【codevs2287】火车站,第一个A掉的钻石题(迟来的解题报告)
- kinect获取实时深度数据
- [Java] Lambda表达式
- 基于 AngularJS 的 UI 框架 Suave UI
- Linux服务器挂载ntfs硬盘,Linux中挂载NTFS格式的硬盘的方法
- 计算机太卡了怎么解决,电脑太卡怎么办最有效
- 幂函数在计算机中怎么下,对数指数幂函数模拟计算机.doc
- conda创建Python虚拟环境
- 三合一剪弦器怎么用_吉他换弦时多余的弦用什么工具剪掉?
- 操作系统中断概念梳理