在看完会话的源码后,最近项目提到了Failover机制的疑问,所以需要先读一下failover机制的源码,看看这个故障重连的机制是怎样的。
在建立连接的那篇文章中(ActiveMQ源码解析(一)建立连接),讲到了FactoryFinder会根据schema找到对应的TransportFactory类,这个配置其实是放在transport目录中的。

private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");

这种配置方法很值得学习。只需要写一个工厂类继承自TransportFactory,然后在"META-INF/services/org/apache/activemq/transport/"目录下放个配置文件,指向该工厂类就OK了。使用配置的方式新增,而不是在TansportFactory代码中去添加,更好的遵守了OCP原则。

Anyway,我们现在找到了FailoverTransportFactory,下一步是调用doConnect建立连接

public Transport doConnect(URI location) throws IOException {try {//主要看这个Transport transport = createTransport(URISupport.parseComposite(location));//与Tcptransport类似,加上辅助功能transport = new MutexTransport(transport);transport = new ResponseCorrelator(transport);return transport;} catch (URISyntaxException e) {throw new IOException("Invalid location: " + location);}}public Transport createTransport(CompositeData compositData) throws IOException {Map<String, String> options = compositData.getParameters();FailoverTransport transport = createTransport(options);if (!options.isEmpty()) {throw new IllegalArgumentException("Invalid connect parameters: " + options);}transport.add(false,compositData.getComponents());return transport;}public FailoverTransport createTransport(Map<String, String> parameters) throws IOException {FailoverTransport transport = new FailoverTransport();Map<String, Object> nestedExtraQueryOptions = IntrospectionSupport.extractProperties(parameters, "nested.");IntrospectionSupport.setProperties(transport, parameters);try {transport.setNestedExtraQueryOptions(URISupport.createQueryString(nestedExtraQueryOptions));} catch (URISyntaxException e) {}return transport;}

调用了FailoverTransport的构造函数,建立了一个FailoverTransport,可以看到在FailoverTransport的方法里建立了一个名为reconnectTask的TaskRunner,跑的Task里的iterate()方法是重复执行的。具体机制见源码解析(二)。

public FailoverTransport() {brokerSslContext = SslContext.getCurrentSslContext();stateTracker.setTrackTransactions(true);// Setup a task that is used to reconnect the a connection async.reconnectTaskFactory = new TaskRunnerFactory();reconnectTaskFactory.init();reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {@Overridepublic boolean iterate() {boolean result = false;//FailoverTransport被启动才执行该线程if (!started) {return result;}boolean buildBackup = true;synchronized (backupMutex) {//若连接未建立||需要重置集群负载||优先连接的服务器可用且该transport未被stop,则执行if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {result = doReconnect();buildBackup = false;}}//是否启用备用连接if (buildBackup) {buildBackups();if (priorityBackup && !connectedToPriority) {try {doDelay();if (reconnectTask == null) {return true;}reconnectTask.wakeup();} catch (InterruptedException e) {LOG.debug("Reconnect task has been interrupted.", e);}}} else {// build backups on the next iterationbuildBackup = true;try {if (reconnectTask == null) {return true;}reconnectTask.wakeup();} catch (InterruptedException e) {LOG.debug("Reconnect task has been interrupted.", e);}}return result;}}, "ActiveMQ Failover Worker: " + System.identityHashCode(this));}

这个reconnectTask的线程会在FailoverTransport调用start()方法的时候开始启用,连接建立后变成wait,且在连接被异常断开时被唤醒。

可用看到上面的代码中最重要的是调用doReconnect()方法。这个方法主要有以下几个部分:
1. 第一部分是用来处理配置形式uris的,如果有配置,优先读取配置的uri,并添加到重连uris中本身不做重连。
2.
第二部分是做负载重连的,根据重连uris如果第一个已经对应的Transport在工作了则无需重连直接返回,否则去掉当前工作的Transport达到负载的目的。

3.
第三部分是关于Transport的备份机制的,如果设置了备份机制,且有Transport已经备份则取出该备份Transport返回。这里也没有设置备份。
可以看到前三部分都没有进行实际的重连工作,第四部分才是在上述三部分都不存在的情况下,进行实际的重连工作。

4.
第四部分才是重连工作,根据uri找到对应的Transport,对该Transport设置独有的TransportListener并启动。然后设置到FailoverTransport的connectedTransport作为当前连接的Transport。

下面把方法中主要的第四部分摘录出来

 Iterator<URI> iter = connectList.iterator();while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {try {// SSL先忽略SslContext.setCurrentSslContext(brokerSslContext);// We could be starting with a backup and if so we wait to grab a// URI from the pool until next time around.// 没建立过连接就先建立一个if (transport == null) {uri = addExtraQueryOptions(iter.next());transport = TransportFactory.compositeConnect(uri);}LOG.debug("Attempting {}th connect to: {}", connectFailures, uri);// 给transport增加一个transportListener,listener可以判断连接是否异常,异常时也唤醒reconnectTasktransport.setTransportListener(createTransportListener(transport));// 如果transport的start方法调用没问题,则表示连接建立成功transport.start();// 如果不是第一次建立连接,则需要通知服务器是一个重连的客户端if (started && !firstConnection) {restoreTransport(transport);}LOG.debug("Connection established");// 为下一次的重连进行配置reconnectDelay = initialReconnectDelay;connectedTransportURI = uri;connectedTransport.set(transport);connectedToPriority = isPriority(connectedTransportURI);reconnectMutex.notifyAll(connectFailures = 0;// Make sure on initial startup, that the transportListener// has been initialized for this instance.// 确保为这个连接加上listenersynchronized (listenerMutex) {if (transportListener == null) {try {// if it isn't set after 2secs - it probably never will belistenerMutex.wait(2000);} catch (InterruptedException ex) {}}}if (firstConnection) {firstConnection = false;LOG.info("Successfully connected to {}", uri);} else {LOG.info("Successfully reconnected to {}", uri);}return false;} catch (Exception e) {failure = e;LOG.debug("Connect fail to: {}, reason: {}", uri, e);if (transport != null) {try {transport.stop();transport = null;} catch (Exception ee) {LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee);}}}

可以看到transport是通过这个语句建立的

  transport = TransportFactory.compositeConnect(uri);

需要注意的是这个时候传入的uri是不带failover的,因此如果设置的是形如failover:(tcp://localhost:61616)这样的连接配置,此时uri应该是
tcp://localhost:61616
。所以返回的transport是tcpTransport。

看着transportListener很重要的样子,那这个transportListener到底起什么作用呢。

 private TransportListener createTransportListener(final Transport owner) {return new TransportListener() {……@Override// 最主要的overide,当异常时调用handleTransportFailure方法public void onException(IOException error) {try {handleTransportFailure(owner, error);} catch (InterruptedException e) {Thread.currentThread().interrupt();if (transportListener != null) {transportListener.onException(new InterruptedIOException());}}}……};}

可以看到transportListener就是一个监控的线程,当发现transport异常时,会调用handleTransportFailure进行故障处理:

 public final void handleTransportFailure(Transport failed, IOException e) throws InterruptedException {// 如果异常是由于连接正在关闭引起的,就不用管它if (shuttingDown) {// shutdown info sent and remote socket closed and we see that before a local close// let the close do the workreturn;}if (LOG.isTraceEnabled()) {LOG.trace(this + " handleTransportFailure: " + e, e);}// 重置transport为空// could be blocked in write with the reconnectMutex held, but still needs to be whackedTransport transport = null;if (connectedTransport.compareAndSet(failed, null)) {transport = failed;if (transport != null) {disposeTransport(transport);}}synchronized (reconnectMutex) {if (transport != null && connectedTransport.get() == null) {boolean reconnectOk = false;// 如果transport是started状态且重连次数未达上限,则表示可以开始尝试重连if (canReconnect()) {reconnectOk = true;}LOG.warn("Transport ({}) failed {} attempting to automatically reconnect: {}",connectedTransportURI, (reconnectOk ? "," : ", not"), e);failedConnectTransportURI = connectedTransportURI;connectedTransportURI = null;connectedToPriority = false;if (reconnectOk) {// notify before any reconnect attempt so ack state can be whackedif (transportListener != null) {transportListener.transportInterupted();}updated.remove(failedConnectTransportURI);// 唤醒reconnectTaskreconnectTask.wakeup();} else if (!isDisposed()) {propagateFailureToExceptionListener(e);}}}}

嗯,其实主要就是唤醒reconnectTask开始重连。。。

总结:FailoverTransport其实就是封装了一个reconnectTask,这个Task会在建立连接时启动,在连接异常时也会被唤醒,其主要就是对尝试建立连接的封装。

接下来还有个问题:
Transport什么时候会抛异常?因为环境中出现了部分异常客户端的连接处于CLOSE_WAIT的状态。且客户端没有再进行重连的尝试,其机制是怎样的呢?

带着这个疑问,后续再研究。

ActiveMQ源码解析(三)Failover机制相关推荐

  1. Redis源码解析(15) 哨兵机制[2] 信息同步与TILT模式

    Redis源码解析(1) 动态字符串与链表 Redis源码解析(2) 字典与迭代器 Redis源码解析(3) 跳跃表 Redis源码解析(4) 整数集合 Redis源码解析(5) 压缩列表 Redis ...

  2. Disruptor源码解析三 RingBuffer解析

    目录 系列索引 前言 主要内容 RingBuffer的要点 源码解析 系列索引 Disruptor源码解析一 Disruptor高性能之道 Disruptor源码解析二 Sequence相关类解析 D ...

  3. ActiveMQ源码解析 建立连接

    作为一个消息中间件,有客户端和服务端两部分代码,这次的源码解析系列主要从客户端的代码入手,分成建立连接.消息发送.消息消费三个部分.趁着我昨天弄明白了源码编译的兴奋劲头还没过去,今天研究一下建立连接的 ...

  4. OkHttp3源码解析(三)——连接池复用

    OKHttp3源码解析系列 OkHttp3源码解析(一)之请求流程 OkHttp3源码解析(二)--拦截器链和缓存策略 本文基于OkHttp3的3.11.0版本 implementation 'com ...

  5. 并发编程与源码解析 (三)

    并发编程 (三) 1 Fork/Join分解合并框架 1.1 什么是fork/join ​ Fork/Join框架是JDK1.7提供的一个用于并行执行任务的框架,开发者可以在不去了解如Thread.R ...

  6. ReactiveSwift源码解析(三) Signal代码的基本实现

    上篇博客我们详细的聊了ReactiveSwift源码中的Bag容器,详情请参见<ReactiveSwift源码解析之Bag容器>.本篇博客我们就来聊一下信号量,也就是Signal的的几种状 ...

  7. 前端入门之(vuex源码解析三)

    上两节前端入门之(vuex源码解析二)我们把vuex的源码大概的撸了一遍,还剩下(插件.getters跟module),我们继续哈~ 插件童鞋们可以去看看vuex在各个浏览器的状态显示插件,小伙伴可以 ...

  8. 拆轮子-RxDownload2源码解析(三)

    本文为博主原创文章,未经允许不得转载 造轮子者:Season_zlc 轮子用法请戳作者链接 ↑ 前言 本文主要讲述 RxDownload2 的多线程断点下载技术. 断点下载技术前提 服务器必须支持按 ...

  9. Tomcat源码解析三:tomcat的启动过程

    Tomcat组件生命周期管理 在Tomcat总体结构 (Tomcat源代码解析之二)中,我们列出了Tomcat中Server,Service,Connector,Engine,Host,Context ...

  10. 【Vue.js源码解析 三】-- 模板编译和组件化

    前言 笔记来源:拉勾教育 大前端高薪训练营 阅读建议:建议通过左侧导航栏进行阅读 模板编译 模板编译的主要目的是将模板 (template) 转换为渲染函数 (render) <div> ...

最新文章

  1. 机器人瓦力船长机器人_警察“瓦力”来啦!机器人巡逻南京路 这样的它你喜欢吗?...
  2. linux 设置固定网络转发_linux服务器配置双网卡转发和静态路由及默认网关
  3. jpa 查找最后一条数据_查找数据的最后1条记录,你用了2小时,同事1分钟就搞定了...
  4. 【转】Hive学习路线图
  5. python知识:QT5的move应用
  6. 网易技术干货 | 云信移动端音视频UI自动化测试实践
  7. 插件~NuGet与packages管理项目的包包
  8. C++与Java异常处理的区别
  9. 【apiPost】-工具
  10. adb命令刷机vivox20_刷机常用adb命令及刷recovery脚本
  11. JS继承的几种方式及优缺点
  12. 优锘科技:数字孪生如何与新基建摩擦出智慧火花
  13. VScode输入感叹号无法生成HTML模板
  14. 治疗骨髓增殖性肿瘤的一种新型高选择性酪氨酸激酶抑制剂ZT55的研究
  15. 卷积网络(持续更新)
  16. 面向对象三大特性(多态)
  17. 解决 Agent JAR loaded but agent failed to initialize
  18. MathType 数学公式编辑器[Baidu]
  19. 数据库事务隔离级别和锁的实现方式
  20. MOS场效应管基本知识

热门文章

  1. 【论文复现】ARBITRAR : User-Guided API Misuse Detectionl
  2. 前言,flutter页面切换动画
  3. Fresco使用详情
  4. ipv6测试工具-支持ipv6的web压力测试工具curl-load
  5. Allegro PCB对边框倒角变成圆弧处理
  6. RainMeter学习3
  7. w ndows无法完成格式化,Windows无法完成格式化怎么办呢?教你解决U盘问题!
  8. Ubuntu14.04 Firefox无法播放视频
  9. TUP首期主题论坛报道:中小型开发商移动开发的生存之道
  10. html标签指定式权重,alternate和Canonical标签防止重复收录分散权重