作为一个消息中间件,有客户端和服务端两部分代码,这次的源码解析系列主要从客户端的代码入手,分成建立连接、消息发送、消息消费三个部分。趁着我昨天弄明白了源码编译的兴奋劲头还没过去,今天研究一下建立连接的部分。

如果读起来吃力,代码部分可以略过,我把主要的功能点给加粗。

通常来说,客户端使用MQ的API建立时,可以分成两个步骤:

  1. 对于连接的配置,比如服务器IP地址,用户名和密码等等
  2. 建立连接并启动
    客户端示例代码:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);
ActiveMQConnection connection = connectionFactory.createConnection();
connection.start();

可以看到主要的方法是ActiveMQConnectionFactory的构造函数,和createConnection(),以及connection中的start()方法。

ActiveMQConnectionFactory中的createConnection
构造函数比较简单,直接把传入的用户名密码和url放在变量里

public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {setUserName(userName);setPassword(password);setBrokerURL(brokerURL.toString());
}

createConnection方法指向了createActiveMQConnection方法,该方法中主要做的事情有三个:

  1. 建立Transport和通过Transport建立Connection
  2. 配置Connection,建立好的Transport对象会被放到Connection对象中
  3. 启动Transport
//建立Transport和通过Transport建立Connection
Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);
//配置
connection.setUserName(userName);
connection.setPassword(password);
configureConnection(connection);
//启动Transport
transport.start();

configureConnection(connection);这个方法的作用是对实例化出的ActiveMQConnetion对象中的参数的一系列配置,代码有点长就不上了。
对于我们来说其实主要想看的是连接是如何建立起来的,也就是

Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);

createTransport();方法中包含了对客户端传入的url的初步校验,主要是验证URL的合法性,而后调用工厂类TransportFactory.connection(url)来进行连接的建立。

我们客户端在建立连接的时候,有可能有TCP、UDP等等协议,AMQ实现了简单工厂类FactoryFinder,在TransportFactory.connection(url)方法中,先是通过FactoryFinder根据用户输入的url(比如tcp://192.168.0.1)来找到使用的协议工厂TcpTransportFactory,然后使用TcpTransportFactory中的类来进行连接的建立。这个过程从代码上来看有点曲折:

  1. TransportFactory的connect()调用findTransportFactory方法
  2. findTransportFactory调用FactoryFinder类的newInstance方法
  3. newInstance调用ObjectFactory类的create方法
  4. ObejctFactory是一个接口类,实现类是StandaloneObjectFactory,其中的create方法调用自身的loadClass方法
  5. loadClass方法中最终找到正确的类,返回至TransportFactory中
  6. 如果是tcp连接,最终得到的就是一个实例化的TcpTransportFactory类
public abstact class TransportFactory {
……private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");public static Transport connect(URI location) throws Exception {TransportFactory tf = findTransportFactory(location);return tf.doConnect(location);}public static TransportFactory findTransportFactory(URI location) throws IOException {//拆分urlString scheme = location.getScheme();if (scheme == null) {throw new IOException("Transport not scheme specified: [" + location + "]");}TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);if (tf == null) {// 调用FactoryFinder找到正确的TransportFactorytry {tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);TRANSPORT_FACTORYS.put(scheme, tf);} catch (Throwable e) {throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);}}return tf;}
……
}
public class FactoryFinder {
……//通过ObjectFactory来找到正确的TransportFactorypublic Object newInstance(String key) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {return objectFactory.create(path+key);}
……
}

ObjectFactory的设计也是很有趣的。AMQ在代码中的说法是之所以这么实现是因为这样如果用户想自己写一个ObjectFactory,也可以支持。

/*** The strategy that the FactoryFinder uses to find load and instantiate Objects* can be changed out by calling the* {@link org.apache.activemq.util.FactoryFinder#setObjectFactory(org.apache.activemq.util.FactoryFinder.ObjectFactory)}* method with a custom implementation of ObjectFactory.** The default ObjectFactory is typically changed out when running in a specialized container* environment where service discovery needs to be done via the container system.  For example,* in an OSGi scenario.*/public interface ObjectFactory {/*** @param path the full service path* @return*/public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException;}

Anyway,我们现在通过这么曲折的过程得到了一个实例化的TcpTransportFactory对象,下一步应该是调用doConnect(url)方法进行连接的建立了。因为TcpTransportFactory继承了TransportFactory类,doConnect方法仍然是在TransportFactory中的:

public Transport doConnect(URI location) throws Exception {try {//把url里关于Transport的配置提取出来,WireFormat基本都可以看成是url的配置。//如果使用Openwire(默认协议),那么WireFormat就是openwire的相关配置。//见http://activemq.apache.org/configuring-wire-formats.htmlMap<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));if( !options.containsKey("wireFormat.host") ) {options.put("wireFormat.host", location.getHost());}WireFormat wf = createWireFormat(options);//建立socket连接Transport transport = createTransport(location, wf);//装饰者模式,在连接上包装上MutexTransportFilter、WireFormatNegotiator、InactivityMonitor、ResponseCorrelator四个功能Transport rc = configure(transport, wf, options);//remove autoIntrospectionSupport.extractProperties(options, "auto.");if (!options.isEmpty()) {throw new IllegalArgumentException("Invalid connect parameters: " + options);}return rc;} catch (URISyntaxException e) {throw IOExceptionSupport.create(e);}}

这个方法中主要有三个重要功能:

  1. 配置wireformat
  2. 建立TcpTransport连接
  3. 在连接上包装四大辅助功能
    其中配置WireFormat可以不看,建立TcpTransport其实是在调用createTransport(location, wf);时引用了TcpTransport的构造函数,代码如下:
public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,URI localLocation) throws UnknownHostException, IOException {this.wireFormat = wireFormat;this.socketFactory = socketFactory;try {this.socket = socketFactory.createSocket();} catch (SocketException e) {this.socket = null;}this.remoteLocation = remoteLocation;this.localLocation = localLocation;this.initBuffer = null;setDaemon(false);}

这里直接调用了socketFactory.createSocket();,使用的是默认的方法,所以无法指定本地网卡建立连接。我看了下其实可以用socketFactory.createSocket(host, port, localHost, localPort)来改写,改写后就可以指定本地IP和端口了。

此外,查了下网络上的资料,四大辅助后续再看:

MutexTransportFilter类实现了对每个请求的同步锁,同一时间只允许发送一个请求,如果有第二个请求需要等待第一个请求发送完毕才可继续发送。

WireFormatNegotiator类实现了在客户端连接broker的时候先发送数据解析相关的协议信息,例如解析版本号,是否使用缓存等信息。

InactivityMonitor类实现了连接成功后启动心跳检查机制,客户端每10秒发送一次心跳信息,服务端每30秒读一次心跳信息,如果没有读到则会断开连接,心跳检测是相互的,客户端也会每30秒读取服务端发送来的心跳信息,如果没有读到也一样会断开连接。

ResponseCorrelator类实现了异步请求但需要获取响应信息否则就会阻塞等待功能。

ActiveMQConnection的Start()
在使用AMQ的过程中,很多用户问我为什么Connection需要start(),不能在createConnection的时候直接start了吗?而且不调用start方法其实不影响发送,但是会影响接收。抱着这样的疑惑,我们来看一下源码。

 /*** Starts (or restarts) a connection's delivery of incoming messages. A call* to <CODE>start</CODE> on a connection that has already been started is* ignored.** @throws JMSException if the JMS provider fails to start message delivery*                 due to some internal error.* @see javax.jms.Connection#stop()*/@Overridepublic void start() throws JMSException {checkClosedOrFailed();ensureConnectionInfoSent();if (started.compareAndSet(false, true)) {for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {ActiveMQSession session = i.next();session.start();}}}

源码里直接对start方法加了注释,说明start就是启动connection可以接收消息的功能。其实源码里可以很明显看出来start里包含了几个步骤:

  1. 检查连接是否关闭或失效
  2. 确认客户端的ConnectionInfo是否被送到服务器
  3. 启动这个Connection中的每一个Session

我好奇的是第二步,看看源码

 /*** Send the ConnectionInfo to the Broker** @throws JMSException*/protected void ensureConnectionInfoSent() throws JMSException {synchronized(this.ensureConnectionInfoSentMutex) {// Can we skip sending the ConnectionInfo packet??if (isConnectionInfoSentToBroker || closed.get()) {return;}//TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?if (info.getClientId() == null || info.getClientId().trim().length() == 0) {info.setClientId(clientIdGenerator.generateId());}syncSendPacket(info.copy(), getConnectResponseTimeout());this.isConnectionInfoSentToBroker = true;// Add a temp destination advisory consumer so that// We know what the valid temporary destinations are on the// broker without having to do an RPC to the broker.ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());if (watchTopicAdvisories) {advisoryConsumer = new AdvisoryConsumer(this, consumerId);}}}

从源码里还能看到讨论和待办……我觉得我已经深入核心了……这个方法里做了两件事,

发送ConnectionInfo的数据包到服务端,如果info里没有用户自己设定的clientID,还会自动帮忙生成一个。发送的时候调用的是syncSendPacket方法,很明显是个同步发送,需要服务端同步返回response才算发送成功,我理解这里应该是一个试探连接的步骤。
建立一个通往临时目的地的消费者。所以其实每一个ActiveMQConnection的连接中都自动包含了一个消费者。我临时写了个客户端试了下,的确是存在的。

奇葩的是我就算不调用connection.start()方法,直接发送消息,这个临时消费者也是存在的,所以肯定是在消息发送的时候的哪个地方调用了connection的start方法。

至于为什么不调用start()方法就无法消费,我看了下session的start方法:

/*** Start this Session.** @throws JMSException*/protected void start() throws JMSException {started.set(true);for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {ActiveMQMessageConsumer c = iter.next();c.start();}executor.start();}

原来是在session的start方法里启动了这个session里的consumer,想想session的建立过程的确是不需要调用session.start方法的,但是我们一般是先调用start方法,而后建立consumer,这个逻辑顺序还是有点错乱……
等下一次研究接收端的源码时再深入吧。

本次发现的源码优化点

  1. socket建立时,使用不同的createSocket方法,指定本机IP和端口。
  2. 项目用到了advisory message,每当agent建立/断开连接的时候,ActiveMQ.Advisory.Connection中会生成一条消息,这个消息中带上了ConnectionInfo。项目就是使用这个来即时检测agent的在线和离线状态的。因此如果我们改一下ConnectionInfo,加上agent的一些重要信息,比如agent版本号,操作系统类型,真实IP地址等等,会在获取agent信息的即时性上得到很大的提高。

我真的去试了一下……在ConnectionInfo里添加了一条test=test,然后重新编译服务端和客户端的依赖jar包,开启MQ的logging plugins,并且用客户端去监听了一下ActiveMQ.Advisory.Connection,得到了这样的结果。

ConnectionInfo {commandId = 1,
responseRequired = true,
connectionId = ID:Air.local-51230-1502000963732-1:1,
clientId = ID:Air.local-51230-1502000963732-0:1,
clientIp = tcp://127.0.0.1:51231,
userName = null, password = *****,
test = test,
brokerPath = null,
brokerMasterConnector = false,
manageable = true,
clientMaster = true,
faultTolerant = true,
failoverReconnect = false}

ActiveMQ源码解析 建立连接相关推荐

  1. OkHttp3源码解析(三)——连接池复用

    OKHttp3源码解析系列 OkHttp3源码解析(一)之请求流程 OkHttp3源码解析(二)--拦截器链和缓存策略 本文基于OkHttp3的3.11.0版本 implementation 'com ...

  2. paho mqtt java_MQTT之Eclipse.Paho源码(一)--建立连接

    写在前面 通过上一个章节MQTT系列---Java端实现消息发布与订阅的介绍,我们已经基本构建出一个可以简单通信的MQTT生产和消费服务,并且具备基本的发布/订阅消息功能.那么从本章开始,我们将从源代 ...

  3. dubbo源码解析-zookeeper创建节点

    前言 在之前dubbo源码解析-本地暴露中的前言部分提到了两道高频的面试题,其中一道dubbo中zookeeper做注册中心,如果注册中心集群都挂掉,那发布者和订阅者还能通信吗?在上周的dubbo源码 ...

  4. Dubbo 实现原理与源码解析系列 —— 精品合集

    摘要: 原创出处 http://www.iocoder.cn/Dubbo/good-collection/ 「芋道源码」欢迎转载,保留摘要,谢谢! 1.[芋艿]精尽 Dubbo 原理与源码专栏 2.[ ...

  5. Dubbo源码解析 —— Zookeeper 订阅

    作者:肥朝 原文地址:https://www.jianshu.com/p/73224a6c07bb 友情提示:欢迎关注公众号[芋道源码].????关注后,拉你进[源码圈]微信群和[肥朝]搞基嗨皮. 友 ...

  6. 客户连接多个服务端_Dubbo源码解析之客户端Consumer

    前面我们学习了Dubbo源码解析之服务端Provider.对服务提供方进行思路上的讲解,我们知道以下知识点.本篇文章主要对消费方进行讲解.废话不多说请看下文. 如何将对象方法生成Invoker 如何将 ...

  7. 谷歌BERT预训练源码解析(二):模型构建

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/weixin_39470744/arti ...

  8. 彻底理解OkHttp - OkHttp 源码解析及OkHttp的设计思想

    OkHttp 现在统治了Android的网络请求领域,最常用的框架是:Retrofit+okhttp.OkHttp的实现原理和设计思想是必须要了解的,读懂和理解流行的框架也是程序员进阶的必经之路,代码 ...

  9. spring 注解试事物源码解析

    spring 注解试事物源码解析 基于xml注解式事务入口 public class TxNamespaceHandler extends NamespaceHandlerSupport {stati ...

最新文章

  1. c语言中小数乘法怎样写程序,四年级下册lbrack;小数乘法rsqb;知识点归纳
  2. BZOJ 2947 Poi2000 促销 set
  3. 第二阶段冲刺10天 第五天
  4. JZOJ 4421. aplusb
  5. 网站计数器 web映射
  6. 云解析DNS能为你做什么?
  7. 雅士利牵手阿里云实现新零售改造,双11全渠道成交金额同比增长超过200%
  8. RabbitMQ的5种队列_通配符模式_入门试炼_第8篇
  9. java token redis生成算法_Redis实现单点登录
  10. vit-pytorch
  11. Python之网络爬虫(XML与HTML与JSON文件、urllib与request的用法)
  12. vscode取消底部横滚动条(自动换行)
  13. [转]使用T4模板批量生成代码
  14. WebCollector
  15. mouseover 和 mouseenter的区别
  16. php excel导出柱状图,YII2框架下使用PHPExcel导出柱状图
  17. string entitlement = Application.dataPath+ “/Editor/Entitle Unity工程到处iOS工程,用脚本把Push Notifications打开
  18. 用MLX90614红外温度传感器制作非接触式红外测温仪
  19. linux下使用C语言实现MQTT通信(三丶总结经验)
  20. Derek Sivers:砍掉一切没有惊讶感的内容(译)

热门文章

  1. python画二次函数图像的顶点_画二次函数图像的步骤
  2. php websocket 是否在线_看完让你彻底理解WebSocket原理,附实战代码(包含前端和后端)...
  3. 云服务器开启ftp_用云服务器怎么挂机器人
  4. 中等职业计算机等级考试,中等职业学校计算机等级考试题库(含答案):EXCEL
  5. matlab调用Java程序时出现 Java.lang.OutOfMemoryErrot: GC overhead limit exceeded
  6. FireDAC 中文字段过滤问题
  7. C++ class、struct区别
  8. spring-boot 速成(8) 集成druid+mybatis
  9. tortoisegit推送ssh-key需要输入用户信息
  10. unbutu安装搜狗输入法【转载】