使用ZooKeeper实现的Master-Slave实现方式,是对ActiveMQ进行高可用的一种有效的解决方案。

原理:

1 使用ZooKeeper(集群)注册所有的ActiveMQ Broker。

2 只有其中的一个Broker可以对外提供服务(也就是Master节点),其他的Broker处于待机状态,被视为Slave。

3 。如果Master因故障而不能提供服务,则利用ZooKeeper的内部选举机制会从Slave中选举出一个Broker充当Master节

点,继续对外提供服务。

官方文档:http://activemq.apache.org/replicated-leveldb-store.html

部署方案

(1)首先我们下载apache-activemq-5.11.1-bin.tar.gz上传到我们的机器上准备部署。

(2)Zookeeper方案

主机IP 消息端口 通信端口 节点目录/root/下
192.168.98.95(hostname:node1 2181 2888:3888 zookeeper-3.4.9
192.168.98.96(hostname:node2 2181 2888:3888 zookeeper-3.4.9
192.168.98.97(hostname:node3 2181 2888:3888 zookeeper-3.4.9

(3)ActiveMQ方案

主机IP
消息端口
集群通信端口 控制台端口 节点目录/root/下
192.168.98.95(hostname:node1
51511
62621 8161 activemq-cluster/node1/
192.168.98.96(hostname:node2 51512 62622 8162 activemq-cluster/node2/
192.168.98.97(hostname:node3 51513 62623 8163 activemq-cluster/node3/

1 首先搭建zookeeper环境,

2 继续搭建activemq环境

2.1 在192.168.98.95节点下,创建/root/activemq-cluster文件夹,解压apache-activemq-5.11.1-bin.tar.gz文件,然后对解压好的文件改名,操作如下:

1 命令:mkdir activemq-cluster

2 命令:tar -zxvf apache-activemq-5.11.1-bin.tar.gz -C /root/activemq-cluster

3 命令:cd  /root/activemq-cluster

4 命令:mv apache-activemq-5.11.1 node1

如此操作,再次反复解压apache-activemq-5.11.1-bin.tar.gz文件到192.168.98.96和192.168.98.97的/root/activemq-cluster/下,分别对应建立node2和node3文件夹。

我们现在已经解压好了三台机器的三个mq节点也就是node1、node2、node3,下面我们要做的事情就是更改每个节点不同的配置和端口。

2.2 修改配置

2.2.1 修改控制台端口(默认为8161),在mq安装路径下的conf/jetty.xml进 行修改即可。(三个节点都要修改,并且端口都不同)

# cd /root/activemq-cluster/node1/conf/
# vim /root/activemq-cluster/node1/conf/jetty.xml

node2和node3分别修改为8162和8163

2.2.2 集群配置文件修改:我们在mq安装路径下的conf/activemq.xml进行修   改其中的持久化适配器,修改其中的bind、zkAddress、hostname、zkPath。然后也需要修改mq的brokerName,并且每个节点名称都必须相同。

#vim /root/activemq-cluster/node1/conf/activemq.xml

第一处修改:brokerName=“jeff-activemq-cluster”(三个节点都需要修改)

第二处修改:先注释掉适配器中的kahadb

第三处修改:添加新的leveldb配置如下(三个节点都需要修改):

Node1:

<persistenceAdapter><replicatedLevelDBdirectory="${activemq.data}/leveldb"replicas="3"bind="tcp://192.168.98.95:62621"zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181"hostname="node1"zkPath="/activemq/leveldb-stores" /></persistenceAdapter>

Node2:

<persistenceAdapter><replicatedLevelDBdirectory="${activemq.data}/leveldb"replicas="3"bind="tcp://192.168.98.96:62622"zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181"hostname="node2"zkPath="/activemq/leveldb-stores" /></persistenceAdapter>

Node3:

<persistenceAdapter><replicatedLevelDBdirectory="${activemq.data}/leveldb"replicas="3"bind="tcp://192.168.98.97:62623"zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181"hostname="node3"zkPath="/activemq/leveldb-stores" /></persistenceAdapter>

第四处修改:(修改通信的端口,避免冲突)
             # vim /root/activemq-cluster/node1/conf/activemq.xml

修改这个文件的通信端口号,三个节点都需要修改(51511,51512,51513)

Ok,到此为止,我们的activemq集群环境已经搭建完毕!

3  测试启动activemq集群
第一步:启动zookeeper集群,命令:zkServer.sh start
第二步:启动mq集群:顺序启动mq:命令如下:
/root/activemq-cluster/node1/bin/activemq start(关闭stop)
/root/activemq-cluster/node2/bin/activemq start(关闭stop)
/root/activemq-cluster/node3/bin/activemq start(关闭stop)
第三步:查看日志信息:
tail -f /root/activemq-cluster/node1/data/activemq.log
tail -f /root/activemq-cluster/node2/data/activemq.log
tail -f /root/activemq-cluster/node3/data/activemq.log
如果不报错,我们的主从配置启动成功,可以使用控制台查看!

第四步:在代码中对集群的brokerUrl配置进行修改即可:

failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false

查看mq主节点

既然zookeeper+activemq主从环境已经搭建完毕,那么究竟哪个机器activemq是主节点?哪个机器activemq是从节点?

我们打开zookeeper:

可以看到在zk的根节点下有个activemq/leveldb-stores节点,它下边有三个子节点,表示我们的三个activemq主机注册的节点,而主节点就是含有"eleced":"0000000003"这个属性的节点,而另外的子节点没有这个属性值:

主节点是node1,192.168.98.95,然后我们打开控制台:

http://192.168.98.96:8162/admin/queues.jsp

http://192.168.98.97:8163/admin/queues.jsp

发现并没有界面服务可以提供

打开:http://192.168.98.95:8161/admin/queues.jsp,界面正常访问:

应用测试

我们连接到activemq的主备集群名称为first的队列去发送50万条消息,每隔1s发送一次,然后启动消费者接收。

Sender.java

package jeff.mq.master;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** @author jeffSheng* 2018年7月3日*/
public class Sender {public static void main(String[] args) throws Exception {final Sender s = new Sender();s.sender();}public void sender() throws Exception{/*** 第一步:* 建立ConnectionFactory工厂对象,需要填入用户名、密码、及要连接的地址,均* 使用默认即可,默认端口“tcp://localhost:61616”*/ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false");/*** 第二步:* 通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法* 开启连接,Connection连接默认是关闭的。*/Connection connection = connectionFactory.createConnection();connection.start();/*** 第三步:* 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务,* 参数配置2为签收模式,一般我们设置自动签收。*///我们这里不开启事务//     Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);//开启事务Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);/*** 第四步:* 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象,* 在ptp模式中,Destination被称作Queue即队列;在Pub/Sub模式中Destination被称作Topic即主题* 在程序众包给可以使用多个Queue和Topic*/Destination destination = session.createQueue("first");/*** 第五步:* 我们需要通过Session对象常见消息的发送和接收对象(生产者和消费者)* MessageProcuder/MessageConsumer*/MessageProducer messageProducer = session.createProducer(null);/*** 第六步:* 我们可以使用MessageProducer的setDeliverMode方法为其设置持久化特性和非持久化特性(DeliverMode)*///如果设置为持久话方式,我们需要指定具体持久话策略,比如jdbc持久化到数据库
//      messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);/*** 第七步:* 最后我们使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据,* 同理客户端使用receive方法进行接收数据。最后不要忘记关闭Connection*/for (int i = 0; i < 500000; i++) {TextMessage textMessage = session.createTextMessage();int n = (int)(Math.random()*10);textMessage.setText("我是消息,Id:"+n);/*** 参数解释:* 第一个参数:目的地* 第二个参数:消息* 第三个参数:是否持久化* 第四个参数:优先级0~9,0-4是普通消息,5-9是加急消息* 第五个参数:存活时间,这里我们设置的是2分钟*/messageProducer.send(destination,textMessage,DeliveryMode.NON_PERSISTENT,0,1000*60);System.out.println("发送消息:"+textMessage.getText());Thread.sleep(1000);}//关闭方法会递归向下关闭会话等连接if(connection!=null){connection.close();}}
}

receiver.java

package jeff.mq.master;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** @author jeffSheng* 2018年7月3日*/
public class Receiver {public static void main(String[] args) throws Exception {/*** 第一步:* 建立ConnectionFactory工厂对象,需要填入用户名、密码、及要连接的地址,均* 使用默认即可,默认端口“tcp://localhost:61616”*/ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false");/*** 第二步:* 通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法* 开启连接,Connection连接默认是关闭的。*/Connection connection = connectionFactory.createConnection();connection.start();/*** 第三步:* 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务,* 参数配置2为签收模式,一般我们设置自动签收。*///我们这里不开启事务Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);/*** 第四步:* 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象,* 在ptp模式中,Destination被称作Queue即队列;在Pub/Sub模式中Destination被称作Topic即主题* 在程序众包给可以使用多个Queue和Topic*/Destination destination = session.createQueue("first");/*** 第五步:* 我们需要通过Session对象常见消息的发送和接收对象(生产者和消费者)* MessageProcuder/MessageConsumer*/MessageConsumer messageConsumer = session.createConsumer(destination);while(true){TextMessage msg = (TextMessage)messageConsumer.receive();if(msg==null)break;System.out.println("收到内容: "+msg.getText());}//关闭方法会递归向下关闭会话等连接if(connection!=null){connection.close();}}}

当node1宕机后,node2和node3中有一个会成为maser,而node1恢复后会进行再次选举!

留个疑问:

      在测试的过程中发现生产者发送的数据无法入队!

---忙活了半个晚上加一个早上终于找到原因了,是linux的时间跟我windows的系统时间不一致,linux的时间快了,导致我windows消息设置的存活时间很快就过去了,发送的消息全部过期,我说怎么在topics的activemq.advisor.expire.queue.two中不停增长,而queue界面毛都没有!


调整了linux的时间,重启zk+mq,终于给我出来了!


linux校对时间方法:

 yum install ntp

 ntpdate cn.pool.ntp.org


失败转移

现在zk中的主activemq服务器是node1:


我们先启动服务让生产者和消费者正常运行,然后我们停掉node1:


可以看到eclipse控制台消费者打印的消息停滞了一小会儿,停在id=50


接着继续打印了!


我们看下现在的zk节点:主节点变成了node2,而node1已经不存在了


我们现在重启node1,



zk多了一个00000000059节点就是node1,但是主节点仍然是node2!activemq的选主逻辑应该是zk监听顺序临时节点的原理。


成功进行了failover失败转移!


负载均衡配置

很简单:

集群1链接集群2:
<networkConnectors>
<networkConnector
uri="static:(tcp://192.168.1.112:51514,tcp://192.168.1.112:51515,tcp://192.168.1.112:51516)"
duplex="false"/>
</networkConnectors>

就是在集群1中连接集群2的uri连接.

循序渐进ActiveMQ(6)----使用zookeeper实现activemq的主从环境搭建相关推荐

  1. ZooKeeper 3.4.5 分布式环境搭建详解

    概述 上一篇中,我们说到了关于 Hadoop-2.2.0 集群的搭建.在这个系列中,Zookeeper 是必不可少的.本文会介绍 Zookeeper-3.4.5 的安装,后面会介绍 HBase-0.9 ...

  2. 大数据 -- zookeeper和kafka集群环境搭建

    一 运行环境 从阿里云申请三台云服务器,这里我使用了两个不同的阿里云账号去申请云服务器.我们配置三台主机名分别为zy1,zy2,zy3. 我们通过阿里云可以获取主机的公网ip地址,如下: 通过secu ...

  3. ZooKeeper 3.4.5 分布式环境搭建

    zookeeper是Hadoop的一个子项目,它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护.名字服务.分布式同步.组服务等.zookeeper的目标是封装好复杂.易出错的关键服务 ...

  4. tomcat,zookeeper,activeMQ,Kafka设置jvm参数

    1,tomcat设置jvm参数 设置方法: 在tomcat bin 目录增加配置:setenv.sh #add tomcat pid CATALINA_PID="$CATALINA_BASE ...

  5. activemq 各种版本区别_Apache ActiveMQ 各个版本所依赖的JDK版本

    ActiveMQ下载地址 http://activemq.apache.org/download-archives.html ActiveMQ 依赖JDK版本 MQ版本号                ...

  6. ActiveMQ入门-ActiveMQ环境搭建

    http://activemq.apache.org/从阿帕奇的官方下载,ActiveMQ的环境搭建依赖于JDK,建议使用1.8 解压缩就能用,执行bin文件夹下面的可执行文件 ➜ ~ cd /Use ...

  7. ActiveMQ连接数过多,导致ActiveMQ无法正常接入数据

    ActiveMQ跑了一段时间,会出现连接数据过多的报错 Could not accept connection : org.apache.activemq.transport.tcp.Exceeded ...

  8. Linux 下的 ActiveMQ C++ 环境搭建与测试

    Linux 下的 ActiveMQ C++ 环境搭建与测试 一.下载安装jdk jdk官网下载地址:https://www.oracle.com/technetwork/java/javase/dow ...

  9. activemq原理 java_面试题activemq的原理详解

    在面试中经常会碰到activemq的问题,相信小伙伴们都挺烦恼的吧,这里小编精心整理了一些关于activemq的面试题,下面快一起来看看吧. 一.activemq是什么? activeMQ是一种开源的 ...

  10. 高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper)

    高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper) 一.集群搭建要求 1.搭建设计 2.分配六台Linux,用于安装拥有三个节点的Kafka集群和三个节点的Zookeep ...

最新文章

  1. 构建之法阅读笔记01
  2. 细数人体十大最“无用”的器官。
  3. C语言经典例16-最大公约数和最小公倍数
  4. Spring AOP 简介以及简单用法
  5. WeChat报错navigateTo:fail can not navigateTo a tabbar page
  6. Brocade NOS学习笔记(第一章——第三章)
  7. maven依赖下载失败问题
  8. 【AIX】用户、组合安全管理
  9. web前端----------网易云音乐播放器简单的实现(素材自行下载)
  10. abb机器人编程指令goto指令_ABB机器人~编程基本指令之运动指令
  11. 图片快速转化为Excel表格
  12. 2021-4-8 【链表】【】
  13. 读《七人分粥》悟管理之道
  14. 机器学习中的小数学知识
  15. 《区块链原理、设计与应用》 – 基于超级账本 Fabric 2.x(学习分享2.1-HyperLedger项目细分)
  16. 真题集P110---2018年真题
  17. MQ-2气体传感器特性解析
  18. 手机网页html怎样阻止广告,Webview拦截广告
  19. 改革春风吹满地(HDU_2036)
  20. 新手程序员想兼职却没有渠道?来看资深程序员在哪接单!

热门文章

  1. android 分享到qq黑屏,Android 第三方登录 QQ登录Android 10系统 出现黑屏问题
  2. 运维工程师故障排查思路(备忘)
  3. [5G学习]01-5G无线接口架构介绍
  4. 斯蒂夫乔布斯传札记:第八波
  5. 明日方舟如何在电脑上玩 明日方舟模拟器教程
  6. 介绍PS工具“仿制图章工具”和“图案图章工具”
  7. 红帽的RHCSA、RHCE、RHCA值得报考吗?
  8. Python dataframe绘制饼图_基于Python的图表绘图系统matplotlib,“饼图“”你真了解吗?...
  9. 北洋园pt---一个好用的pt网站
  10. hackthebox - blunder (Bludit渗透cewl使用 sudo提权)