1.ActiveMQ集群介绍

1.为什么要集群?   

  实现高可用,以排除单点故障引起的服务中断

  实现负载均衡,以提升效率为更多客户提供服务

2.集群方式

  客户端集群:让多个消费者消费同一个队列

  Broker Cluster:多个Broker之间同步消息(做不了高可用,可以实现负载均衡)

  Master-Slave:高可用(做不了负载均衡)

3.ActiveMq失效转移

  允许当其中一台消息服务器宕机时,客户端在传输层上重新连接其他消息服务器。

  语法:failover:(uri1,...uriN)?transportOptions

    transportOptions参数说明:randomize:默认为true,表示在URI列表中选择URI连接时是否采用随机策略

                initialReconnectDelay默认为10,默认10毫秒,表示第一次尝试重连之间等待的时间。

                maxReconnectDelay:默认30000,单位毫秒,最长重连时间的间隔

4.Master/Slave集群配置

  Share nothing storage master/slave(已经过时,5.8之后移除)

  shared storage master/slave 共享存储(实际上是共享同一文件夹,只不过采用排他锁,所以只有一个master节点可以访问,当此服务宕机,另一台slave可以快速强占排他锁,所以不会造成数据丢失,使用同一文件夹下的东西。如果多态服务器的话需要搭建文件共享服务器)

  Replicated LevelDB Store 基于复制的LevelDB Store

1.共享存储原理:(获取排他锁才可以提供消息服务)--简单方式

    

2.Replicated LevelDB Store 基于复制的LevelDB Store原理(基于zookeper)

3.两种方式对比:

  高可用 负载均衡
Master/Slave
Broker Cluster

4.三台机器的完美集群方案:(实现高可用和负载均衡)

2.ActiveMQ集群配置

1.方案介绍

方案如下: B与C采用master-slave共享文件夹存储(任一时刻只有一个可以占有排他锁,也就是只有一个可以提供服务),当A宕机之后B会获取资源锁提供服务---实现高可用

       A与B、A与C都采用Broker 集群,可以同时提供服务,不管B与C谁获取锁,A都可以提供服务(A只能提供服务消费消息,不生成消息)---实现负载均衡

  

配置如下:(同一个电脑不同端口模拟集群)

    

 2.配置文件 (修改的配置文件都是在apache-activemq-5.15.6\conf文件夹下)

  

(1)activemq-a下面的配置:---A服务器

activemq.xml:(注释掉其他协议,服务端口使用61616端口;增加静态网络连接器,同时连接B与C)

jetty.xml:  采用默认的8161端口

(2)activemq-b下面的配置:---B服务器

activemq.xml:(增加与A的静态连接器,修改服务端口采用61617,修改共享文件夹的地址)

jitty.xml:修改端口采用8162

(3)activemq-c下面的配置:---C服务器

activemq.xml:(增加与A的静态连接器,修改服务端口采用61618,修改共享文件夹的地址)

jitty.xml:修改端口采用8163

3.依次启动ActiveMq进行测试

  启动顺序是:A->B->C,下面通过端口信息验证集群:

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8161TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2423496TCP    [::]:8161              [::]:0                 LISTENING       2423496liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8162TCP    0.0.0.0:8162           0.0.0.0:0              LISTENING       2423756TCP    [::]:8162              [::]:0                 LISTENING       2423756
liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8163liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61616TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2423496TCP    127.0.0.1:55338        127.0.0.1:61616        ESTABLISHED     2423756TCP    127.0.0.1:61616        127.0.0.1:55338        ESTABLISHED     2423496TCP    [::]:61616             [::]:0                 LISTENING       2423496liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61617TCP    0.0.0.0:61617          0.0.0.0:0              LISTENING       2423756TCP    127.0.0.1:55345        127.0.0.1:61617        ESTABLISHED     2423496TCP    127.0.0.1:61617        127.0.0.1:55345        ESTABLISHED     2423756TCP    [::]:61617             [::]:0                 LISTENING       2423756
liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61618

  由于先启动的B,B占有排斥锁,所以B(8162-61617)处于监听状态,而C与B采用共享文件排他锁集群,所以C处于阻塞状态,也就是没有监听端口,被阻塞。

 停掉B服务器,模拟B服务器宕机的情况,再次查看端口:

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8161TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2423496TCP    [::]:8161              [::]:0                 LISTENING       2423496

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8163TCP    0.0.0.0:8163           0.0.0.0:0              LISTENING       2423848TCP    [::]:8163              [::]:0                 LISTENING       2423848

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61616TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2423496TCP    127.0.0.1:56704        127.0.0.1:61616        ESTABLISHED     2423848TCP    127.0.0.1:61616        127.0.0.1:56704        ESTABLISHED     2423496TCP    [::]:61616             [::]:0                 LISTENING       2423496

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61617liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61618TCP    0.0.0.0:61618          0.0.0.0:0              LISTENING       2423848TCP    127.0.0.1:56791        127.0.0.1:61618        ESTABLISHED     2423496TCP    127.0.0.1:61618        127.0.0.1:56791        ESTABLISHED     2423848TCP    [::]:61618             [::]:0                 LISTENING       2423848

liqiang@root MINGW64 ~/Desktop

  由于B宕机,所以C会强占排他锁,也就是会监听端口,所以可以看到8163与61618端口的监听状态。

  现在模拟C也宕机,将C服务器也停掉。再次查看端口信息:

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8161TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2423496TCP    [::]:8161              [::]:0                 LISTENING       2423496

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8163liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61616TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2423496TCP    [::]:61616             [::]:0                 LISTENING       2423496

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61617TCP    127.0.0.1:57242        127.0.0.1:61617        SYN_SENT        2423496liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61618liqiang@root MINGW64 ~/Desktop

  此时A服务器仍然在监听端口。

  上面证明集群搭建成功,下面重新开启A->B->C服务器开始程序验证。

4.程序验证集群

服务器开启顺序性:A->B->C(此时B占有排他锁,B监听端口,A一直监听)

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61617liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61616TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2428232TCP    127.0.0.1:60520        127.0.0.1:61616        ESTABLISHED     2427192TCP    127.0.0.1:61616        127.0.0.1:60520        ESTABLISHED     2428232TCP    [::]:61616             [::]:0                 LISTENING       2428232

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61617TCP    0.0.0.0:61617          0.0.0.0:0              LISTENING       2427192TCP    127.0.0.1:60513        127.0.0.1:61617        ESTABLISHED     2428232TCP    127.0.0.1:61617        127.0.0.1:60513        ESTABLISHED     2427192TCP    [::]:61617             [::]:0                 LISTENING       2427192

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61618liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8161TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2428232TCP    [::]:8161              [::]:0                 LISTENING       2428232

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8162TCP    0.0.0.0:8162           0.0.0.0:0              LISTENING       2427192TCP    [::]:8162              [::]:0                 LISTENING       2427192
liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8163

代码测试:

生产者:  url加了失效策略,而且采用随机选取,生产消息的地址只有B与C服务器地址

package cn.qlq.activemq;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 生产消息* * @author QiaoLiQiang* @time 2018年9月18日下午11:04:41*/
public class MsgProducer {private static final String url = "failover:(tcp://localhost:61617,tcp://localhost:61618)?randomize=true";private static final String queueName = "myQueue";public static void main(String[] args) throws JMSException {// 1创建ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);// 2.由connectionFactory创建connectionConnection connection = connectionFactory.createConnection();// 3.启动connection
        connection.start();// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5.创建Destination(Queue继承Queue)Queue destination = session.createQueue(queueName);// 6.创建生产者producerMessageProducer producer = session.createProducer(destination);for (int i = 0; i < 100; i++) {// 7.创建Message,有好多类型,这里用最简单的TextMessageTextMessage tms = session.createTextMessage("textMessage:" + i);// 8.生产者发送消息
            producer.send(tms);System.out.println("send:" + tms.getText());}// 9.关闭connection
        connection.close();}}

消费者:url加了失效策略,而且采用随机选取,生产消息的地址有ABC服务器

package cn.qlq.activemq;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 消费消息* * @author QiaoLiQiang* @time 2018年9月18日下午11:26:41*/
public class MsgConsumer {private static final String url = "failover:(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618)?randomize=true";private static final String queueName = "myQueue";public static void main(String[] args) throws JMSException {// 1创建ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);// 2.由connectionFactory创建connectionConnection connection = connectionFactory.createConnection();// 3.启动connection
        connection.start();// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5.创建Destination(Queue继承Queue)Queue destination = session.createQueue(queueName);// 6.创建消费者consumerMessageConsumer consumer = session.createConsumer(destination);// 7.给消费者绑定监听器(消息的监听是一个异步的过程,不可以关闭连接,绑定监听器线程是一直开启的,处于阻塞状态,所以可以在程序退出关闭)consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {// 7.1由于消费者接受的是TextMessage,所以强转一下TextMessage tms = (TextMessage) message;try {System.out.println("接收消息:" + tms.getText());} catch (JMSException e) {e.printStackTrace();}}});}}

启动生产者发布100条消息:

管理界面查看:(消息被发布在B服务器,B占有排他锁,C处于阻塞)

 A服务不生产消息所有A不会消费消息:(通过A查看网络连接器)

关掉B服务器之后查看C服务器:(验证B与C共享同一文件夹,且强占排他锁)

查看共享文件夹:

 此时A与C处于监听状态,启动消费者:

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8161TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2431676TCP    [::]:8161              [::]:0                 LISTENING       2431676

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8163TCP    0.0.0.0:8163           0.0.0.0:0              LISTENING       2432188TCP    [::]:8163              [::]:0                 LISTENING       2432188TCP    [::1]:8163             [::1]:63600            TIME_WAIT       0TCP    [::1]:8163             [::1]:63648            TIME_WAIT       0TCP    [::1]:8163             [::1]:63649            TIME_WAIT       0TCP    [::1]:8163             [::1]:63650            TIME_WAIT       0TCP    [::1]:8163             [::1]:63651            TIME_WAIT       0TCP    [::1]:8163             [::1]:63652            TIME_WAIT       0

查看控制台如下:连接到A服务器消费信息

 总结:上面的配置方案B与C是为了实现高可用,也就是一台宕机之后另一台马上强占排他锁提供服务(需要共享文件夹实现共用同一文件夹下的资源与锁),B与C可以生产消息,也可以提供消费消息,但是同一时刻只有一个提供服务。

     提供A是为了与B、C实现负载均衡,A不生产消息,但是可以消费消息,替B或者C分担压力。

转载于:https://www.cnblogs.com/qlqwjy/p/9728425.html

ActiveMQ集群相关推荐

  1. ActiveMQ — 集群 — 安装与配置

    2019独角兽企业重金招聘Python工程师标准>>> 1. 服务器配置 推荐集群的数量为基数,并且在这些机器上安装JDK,配置环境变量. 2. 集群方式 常用的集群方式有三种: 基 ...

  2. 使用jmeter对ActiveMQ集群性能方案进行评估--转载

    原文地址:http://www.51testing.com/html/78/23978-143163.html 1.测试概要 1.1 关于 这篇文档中涉及的基于JMS的消息系统能为应用程序提供可靠的, ...

  3. 分布式ActiveMQ集群--转载

    原文地址:http://shensy.iteye.com/blog/1752529 回顾总结前一段时间学习的ActiveMQ分布式集群相关的知识,分享出来希望对看到的人有所帮助. 一.分布式Activ ...

  4. ActiveMQ 集群部署

    ActiveMQ 集群部署 本章演示 ActiveMQ 集群部署,默认您已经安装了 zookeeper 集群,并在各服务器上成功安装了 ActiveMQ 单节点实例如果您的环境还不满足条件请参考前面的 ...

  5. ActiveMQ集群架构与原理解析

    初识JMIS与其专业术语 小伙伴们大家好,现在我们和大家一起了解一下古老而又神秘的消息中间件"ActiveMQ".首先,说起ActiveMQ,就必须先聊聊JMS (Java Mes ...

  6. centos activemq 集群配置 Networks of Brokers

    1.安装JDK运行环境 #cd /opt#wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=a ...

  7. linux activemq修改端口号,linux下 activemq集群配置

    1.简述:回想老王打电话讲故事案例. 2.优势:解耦,异步,横向扩展,顺序保障,安全可靠... 3.JMS(java message service),是java平台中关于面向消息中间件的API,用于 ...

  8. activemq mysql集群配置_ActiveMQ专题--集群,高可用方案

    ActiveMQ集群部署方式 Master-Slave部署方式 Broker-Cluster部署方式 Master-Slave与Broker-Cluster相结合的部署方式 Shared databa ...

  9. ActiveMQ的几种集群配置

    ActiveMQ是一款功能强大的消息服务器,它支持许多种开发语言,例如Java, C, C++, C#等等.企业级消息服务器无论对服务器稳定性还是速度,要求都很高,而ActiveMQ的分布式集群则能很 ...

最新文章

  1. [skill] vim 操作多个window
  2. TensorFlow快餐教程:程序员快速入门深度学习五步法
  3. MySQL复制经常使用拓扑结构具体解释
  4. android中自适应布局教程,Android自适应布局设计技巧
  5. 大型互联网公司必考java面试题与面试技巧
  6. 敏捷转型历程 - Sprint3 回顾会
  7. php xml写入数据库中,PHP读取xml并写入数据库示例
  8. [翻译] C# 8.0 新特性
  9. 36产生用户恶情绪和报复情绪的原因
  10. AI医疗 | 新开源计算机视觉技术用于新生儿胎龄估计
  11. 【codevs2287】火车站,第一个A掉的钻石题(迟来的解题报告)
  12. kinect获取实时深度数据
  13. [Java] Lambda表达式
  14. 基于 AngularJS 的 UI 框架 Suave UI
  15. Linux服务器挂载ntfs硬盘,Linux中挂载NTFS格式的硬盘的方法
  16. 计算机太卡了怎么解决,电脑太卡怎么办最有效
  17. 幂函数在计算机中怎么下,对数指数幂函数模拟计算机.doc
  18. conda创建Python虚拟环境
  19. 三合一剪弦器怎么用_吉他换弦时多余的弦用什么工具剪掉?
  20. 操作系统中断概念梳理

热门文章

  1. 在Linux上自动调整屏幕亮度保护眼睛
  2. Java的对象初始化过程
  3. 为什么单例模式需要double check
  4. NSString ,NSMutableString用法以及一些常用方法
  5. 网站测试自动化系统—系统应该有的功能
  6. cambridge map
  7. Node.js 第二个思路
  8. valid, satisfiable, unsatisfiable的例子
  9. 转:手机流畅的决定性因素
  10. string与stringBuilder的效率与内存占用实测