2019独角兽企业重金招聘Python工程师标准>>>

一、架构和技术介绍

1、简介

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现

2、activemq的特性

1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4商业服务器上

5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

6. 支持通过JDBC和journal提供高速的消息持久化

7. 从设计上保证了高性能的集群,客户端-服务器,点对点

8. 支持Ajax

9. 支持与Axis的整合

10. 可以很容易得调用内嵌JMS provider,进行测试

3、下载和安装ActiveMQ

1、下载

ActiveMQ的最新版本是5.10.0,但由于我们内网下载存在问题,所以目前通过内网只能下载到5.9.0,下载地址:http://activemq.apache.org/activemq-590-release.html。

2、安装

如果是在windows系统中运行,可以直接解压apache-activemq-5.9.0-bin.zip,并运行bin目录下的activemq.bat文件,此时使用的是默认的服务端口:61616和默认的console端口:8161。

如果是在linux或unix下运行,在bin目录下执行命令:./activemq setup

3、修改ActiveMQ的服务端口和console端口

A、修改服务端口:打开conf/activemq.xml文件,修改以下红色字体部分

<transportConnectors>

<transportConnector name="openwire"                         uri="tcp://10.42.220.72:61618"discoveryUri="multicast://default"/>

</transportConnectors>

B、修改console的地址和端口:打开conf/jetty.xml文件,修改以下红色字体部分

<bean id="jettyPort"class="org.apache.activemq.web.WebConsolePort"init-method="start">

<property name="port" value="8162"/>

</bean>

4、通过客户端代码试用ActiveMQ

需要提前将activemq解压包中的lib目录下的相关包引入到工程中,再进行如下编码:

1、发送端的代码:

import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {private static final int SEND_NUMBER = 5;public static void main(String[] args) {// ConnectionFactory:连接工厂,JMS用它创建连接ConnectionFactory connectionFactory;// Connection:JMS客户端到JMS Provider的连接Connection connection = null;// Session:一个发送或接收消息的线程Session session;// Destination:消息的目的地;消息发送给谁.Destination destination;// MessageProducer:消息发送者MessageProducer producer;// TextMessage message;//构造ConnectionFactory实例对象,此处采用ActiveMq的实现jarconnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");try {//构造从工厂得到连接对象connection =connectionFactory.createConnection();//启动connection.start();//获取操作连接session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//获取sessiondestination = session.createQueue("FirstQueue");//得到消息生成者【发送者】producer =session.createProducer(destination);//设置不持久化,此处学习,实际根据项目决定producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//构造消息,此处写死,项目就是参数,或者方法获取sendMessage(session, producer);session.commit();} catch (Exception e) {e.printStackTrace();} finally {try {if (null != connection)connection.close();} catch (Throwable ignore) {}}}publicstaticvoid sendMessage(Session session,MessageProducer producer)throws Exception {for (int i = 1; i <=SEND_NUMBER; i++) {TextMessage message = session.createTextMessage("ActiveMq发送的消息" + i);//发送消息到目的地方System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);producer.send(message);}}}

2、接收端代码:

import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Receive {public static void main(String[] args) {// ConnectionFactory:连接工厂,JMS用它创建连接ConnectionFactory connectionFactory;// Connection:JMS客户端到JMS Provider的连接Connection connection = null;// Session:一个发送或接收消息的线程Session session;// Destination:消息的目的地;消息发送给谁.Destination destination;//消费者,消息接收者MessageConsumer consumer;connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");try {//构造从工厂得到连接对象connection =connectionFactory.createConnection();//启动connection.start();//获取操作连接session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//获取sessiondestination = session.createQueue("FirstQueue");consumer =session.createConsumer(destination);while (true) {//设置接收者接收消息的时间,为了便于测试,这里谁定为100sTextMessage message =(TextMessage) consumer.receive(100000);if (null != message) {System.out.println("收到消息" + message.getText());} else {break;}}} catch (Exception e) {e.printStackTrace();} finally {try {if (null != connection)connection.close();} catch (Throwable ignore) {}}}}

3、通过监控查看消息堆栈的记录:

登陆http://localhost:8162/admin/queues.jsp,默认的用户名和密码:admin/admin

二、ActiveMQ的多种部署方式

单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供了master-slave、broker cluster等多种部署方式,但通过分析多种部署方式之后我认为需要将两种部署方式相结合才能满足我们公司分布式和高可用的需求,所以后面就重点将解如何将两种部署方式相结合。

1、Master-Slave部署方式
1)shared filesystem Master-Slave部署方式

主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。

多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。

2)shared database Master-Slave方式

与shared filesystem方式类似,只是共享的存储介质由文件系统改成了数据库而已。

3)Replicated LevelDB Store方式

这种主备方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper协调选择一个node作为master。被选择的master broker node开启并接受客户端连接。

其他node转入slave模式,连接master并同步他们的存储状态。slave不接受客户端连接。所有的存储操作都将被复制到连接至Master的slaves。

如果master死了,得到了最新更新的slave被允许成为master。fialed node能够重新加入到网络中并连接master进入slave mode。所有需要同步的disk的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了replicas=3,那么法定大小是(3/2)+1=2. Master将会存储并更新然后等待 (2-1)=1个slave存储和更新完成,才汇报success。至于为什么是2-1,熟悉Zookeeper的应该知道,有一个node要作为观擦者存在。

单一个新的master被选中,你需要至少保障一个法定node在线以能够找到拥有最新状态的node。这个node将会成为新的master。因此,推荐运行至少3个replica nodes,以防止一个node失败了,服务中断。

2、Broker-Cluster部署方式

前面的Master-Slave的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。Broker-Cluster的部署方式就可以解决负载均衡的问题。

Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue。当broker-A上面指定的queue-A中接收到一个message处于pending状态,而此时没有consumer连接broker-A时。如果cluster中的broker-B上面由一个consumer在消费queue-A的消息,那么broker-B会先通过内部网络获取到broker-A上面的message,并通知自己的consumer来消费。

1)static Broker-Cluster部署

在activemq.xml文件中静态指定Broker需要建立桥连接的其他Broker:

1、  首先在Broker-A节点中添加networkConnector节点:

<networkConnectors>

<networkConnector   uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>

</networkConnectors>

2、  修改Broker-A节点中的服务提供端口为61616:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

3、  在Broker-B节点中添加networkConnector节点:

<networkConnectors>

<networkConnector   uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

4、  修改Broker-A节点中的服务提供端口为61617:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

5、分别启动Broker-A和Broker-B。

2)Dynamic Broker-Cluster部署

在activemq.xml文件中不直接指定Broker需要建立桥连接的其他Broker,由activemq在启动后动态查找:

1、  首先在Broker-A节点中添加networkConnector节点:

<networkConnectors>

<networkConnectoruri="multicast://default"

dynamicOnly="true"

networkTTL="3"

prefetchSize="1"

decreaseNetworkConsumerPriority="true" />

</networkConnectors>

2、修改Broker-A节点中的服务提供端口为61616:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>

</transportConnectors>

3、在Broker-B节点中添加networkConnector节点:

<networkConnectors>

<networkConnectoruri="multicast://default"

dynamicOnly="true"

networkTTL="3"

prefetchSize="1"

decreaseNetworkConsumerPriority="true" />

</networkConnectors>

4、修改Broker-B节点中的服务提供端口为61617:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>

</transportConnectors>

5、启动Broker-A和Broker-B

2、Master-Slave与Broker-Cluster相结合的部署方式

可以看到Master-Slave的部署方式虽然解决了高可用的问题,但不支持负载均衡,Broker-Cluster解决了负载均衡,但当其中一个Broker突然宕掉的话,那么存在于该Broker上处于Pending状态的message将会丢失,无法达到高可用的目的。

由于目前ActiveMQ官网上并没有一个明确的将两种部署方式相结合的部署方案,所以我尝试者把两者结合起来部署:

1、部署的配置修改

这里以Broker-A + Broker-B建立cluster,Broker-C作为Broker-B的slave为例:

1)首先在Broker-A节点中添加networkConnector节点:

<networkConnectors>

<networkConnector   uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>

</networkConnectors>

2)修改Broker-A节点中的服务提供端口为61616:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

3)在Broker-B节点中添加networkConnector节点:

<networkConnectors>

<networkConnector   uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

4)修改Broker-B节点中的服务提供端口为61617:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

5)修改Broker-B节点中的持久化方式:

<persistenceAdapter>

<kahaDB directory="/localhost/kahadb"/>

</persistenceAdapter>

6)在Broker-C节点中添加networkConnector节点:

<networkConnectors>

<networkConnector   uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

7)修改Broker-C节点中的服务提供端口为61618:

<transportConnectors>

<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

8)修改Broker-B节点中的持久化方式:

<persistenceAdapter>

<kahaDB directory="/localhost/kahadb"/>

</persistenceAdapter>

9)分别启动broker-A、broker-B、broker-C,因为是broker-B先启动,所以“/localhost/kahadb”目录被lock住,broker-C将一直处于挂起状态,当人为停掉broker-B之后,broker-C将获取目录“/localhost/kahadb”的控制权,重新与broker-A组成cluster提供服务。

来源:海鹏的博客

转载于:https://my.oschina.net/u/1788620/blog/392029

ActiveMQ实现负载均衡+高可用部署方案相关推荐

  1. keepalive+nginx实现负载均衡高可用_高可用、负载均衡 集群部署方案:Keepalived + Nginx + Tomcat...

    前言:初期应用较小,一般以单机部署为主,即可满足业务的需求,随着业务的不断扩大,单机部署的模式无法承载这么大的业务量,需要进行服务集群化的部署,本文主要介绍服务器Tomcat多实例部署,搭载Keepa ...

  2. LVS原理详解及部署之五:LVS+keepalived实现负载均衡高可用

    本文我们主要讲解的是LVS通过keepalived来实现负载均衡和高可用,而不是我们第三篇文章介绍的通过手动的方式来进行配置.通过脚本的方式来显示RS节点的健康检查和LVS的故障切换.此文会通过一个实 ...

  3. linux系统下对网站实施负载均衡+高可用集群需要考虑的几点

    随着linux系统的成熟和广泛普及,linux运维技术越来越受到企业的关注和追捧.在一些中小企业,尤其是牵涉到电子商务和电子广告类的网站,通常会要求作负载均衡和高可用的Linux集群方案. 那么如何实 ...

  4. Open***异地机房互连以及负载均衡高可用解决方案

    架构方案如下: ---Open××× server 搭建部署 1.在Open×××-1 server上安装流程 (1.添加epel源 [root@ShangHai-×××-1 ~]# yum inst ...

  5. saltstack实现haproxy+keepalived负载均衡+高可用(二)

    一键部署haproxy+keepalived实现负载均衡+高可用 实验环境: !!!!    特别注意: www.westos.org为test1的minion名字 test1: 172.25.1.1 ...

  6. Lvs+keepAlived实现负载均衡高可用集群(DR实现)

    第1章 LVS 简介 1.1 LVS介绍 LVS是Linux Virtual Server的简写,意为Linux虚拟服务器,是虚拟的服务器集群系统,可在UNIX/LINUX平台下实现负载均衡集群功能. ...

  7. Nginx+keepalived负载均衡高可用篇第③版

    Nginx+keepalived负载均衡高可用篇第③版 对付中.小型企业,假如没有资金去购买昂贵的四/七层负载均衡交换机,那么Nginx是不错的七层负载均衡选择,并且可以通过Nginx + Keepa ...

  8. LVS+Keepalived-DR模式负载均衡高可用集群

    LVS+Keepalived DR模式负载均衡+高可用集群架构图 工作原理: Keepalived采用VRRP热备份协议实现Linux服务器的多机热备功能. VRRP,虚拟路由冗余协议,是针对路由器的 ...

  9. nginx负载均衡高可用

    1.1   什么是负载均衡高可用 nginx作为负载均衡器,所有请求都到了nginx,可见nginx处于非常重点的位置,如果nginx服务器宕机后端web服务将无法提供服务,影响严重. 为了屏蔽负载均 ...

最新文章

  1. Linux命令查看服务器信息
  2. 60 Permutation Sequence
  3. python 多态 知乎_Python函数接口的一些设计心得
  4. C#中窗口关闭时没有取消事件订阅导致事件重复执行的解决方法
  5. Python学习之路:函数参数及调用
  6. S3C2440中断解析和基于WINCE操作系统的中断分析(整理于网络,用于按键中断使用)
  7. 构件之法读书笔记04
  8. 时间序列趋势判断(二)——Cox-Staut趋势检验
  9. Stanford机器学习---第二讲. 多变量线性回归 Linear Regression with multiple variable
  10. 扫码枪 android 广播,Android 扫码枪监听封装
  11. Android 百度地图反向Geo “PERMISSION UNFINISHED“
  12. 运行Ubuntu的HP笔记本合上盖子不休眠也不断网
  13. 量子计算机解泊松方程,试求泊松方程的解.ppt
  14. 计算机毕业设计Java酒店管理信息系统(源码+mysql数据库+系统+lw文档)
  15. 池化层(汇聚层)的通道变化
  16. MT8732 / MT8735处理器特点/芯片组型号资料介绍
  17. 到底什么是国土空间规划?
  18. The page has expired due to inactivity. Please refresh and try again.
  19. 十位改变世界的人工智能领域大师
  20. 【网络篇】第十七篇——IP协议详解

热门文章

  1. mips汇编计算开方_清华考研辅导班-2020清华大学912计算机专业基础综合考研经验真题参考书目...
  2. flutter 应用场景_【Flutter 1-12】Flutter手把手教程Dart语言——什么是泛型和泛型的使用场景...
  3. 【百战GAN】新手如何开始你的第一个生成对抗网络(GAN)任务
  4. 2021年跨境电商市场怎么样?新手商家入驻还有机会吗?
  5. 中国褐煤行业发展趋势前瞻与十四五战略规划分析报告2022-2028年
  6. 农民代言人谋定农业大健康--万祥军:创业路上功能性农业
  7. cocos2d-x 3.2读取xml和json练习
  8. 【转】vim中将tab自动转换成空格
  9. AJAX面试题:一个页面实现增删改查(ASP.NET实现)
  10. AJAX中的Back Button/Bookmarking问题和Nikhil Kothari的Atlas解决方案