为什么80%的码农都做不了架构师?>>>   

原创文章,转载请注明出处:http://jameswxx.iteye.com/blog/2096446
这里以消费者为例说明。一组消费者要消费某个topic,得先知道该topic分布在哪些broker上,某个broker上的topic分布可能会变化,一旦变化,生产者和消费者应该都能被通知到。通知模式有推和拉两种,客户端都是采取拉的模式,所以broker如有变化,通知都是有延迟的。
一 什么时候启动topic路由获取任务
两个地方:
1 首先是DefaultMQPushConsumerImpl启动时,见DefaultMQPushConsumerImpl的start方法里的 this .updateTopicSubscribeInfoWhenSubscriptionChanged();
2 另外DefaultMQPushConsumerImpl的start方法也启动了MQClientInstance,MQClientInstance的start方法里调用了 startScheduledTask()方法,该方法启动了获取路由的定时任务。
         // 定时从Name Server获取Topic路由信息
         this . scheduledExecutorService  .scheduleAtFixedRate( new  Runnable() {
             @Override
             public  void  run() {
                 try  {
                    MQClientInstance. this  .updateTopicRouteInfoFromNameServer();
                }
                 catch  (Exception e) {
                     log .error(  "ScheduledTask updateTopicRouteInfoFromNameServer exception" , e);
                }
            }
        }, 10,  this . clientConfig  .getPollNameServerInteval(), TimeUnit. MILLISECONDS  );
 
二 每隔多久获取一次
很简单,看定时任务每隔多久执行一次就知道了,这里的间隔参数是 this . clientConfig  .getPollNameServerInteval()。
ClientConfig的pollNameServerInteval 定义如下:
private int pollNameServerInteval = 1000 * 30;
DefaultMQPushConsumer继承了ClientConfig, pollNameServerInteval 默认是30秒,显然,这个时间是可以自己定义的,通过 DefaultMQPushConsumer的 setPollNameServerInteval()方法。
 
三 获取路由过程
看 MQClientInstance的 updateTopicRouteInfoFromNameServer()方法,该方法最终会调用下面这个方法,需要注意,对于消费者而言, isDefault参数永远是false。
   public  boolean  updateTopicRouteInfoFromNameServer( final  String topic,  boolean  isDefault, DefaultMQProducer defaultMQProducer) {
         try  {
             if  ( this . lockNamesrv  .tryLock( LockTimeoutMillis , TimeUnit. MILLISECONDS  )) {
                 try  {
                    TopicRouteData topicRouteData;
                     if  (isDefault && defaultMQProducer !=  null ) {
                       //此处省略不必要的信息,对于消费者,分支不会走到这里来,因为 isDefault为false,且生产者肯定为空
                    }
                     else  {
                        topicRouteData =
                                 this . mQClientAPIImpl  .getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    //此处省略无关语句
                }
                 catch  (Exception e) {
                     if  (!topic.startsWith(MixAll. RETRY_GROUP_TOPIC_PREFIX  )
                            && !topic.equals(MixAll. DEFAULT_TOPIC  )) {
                         log .warn( "updateTopicRouteInfoFromNameServer Exception"  , e);
                    }
                }
                 finally  {
                     this . lockNamesrv  .unlock();
                }
            }
             else  {
                 log .warn( "updateTopicRouteInfoFromNameServer tryLock timeout {}ms" ,  LockTimeoutMillis );
            }
        }
         catch  (InterruptedException e) {
             log .warn(  "updateTopicRouteInfoFromNameServer Exception" , e);
        }
 
         return  false  ;
  }
其实最终都是通过 this  . mQClientAPIImpl  .getTopicRouteInfoFromNameServer(topic, 1000 * 3);得到的。
 
 
 
四 客户端与nameserver的连接关系
broker与所有nameserver都是长连接,如有变化,则向所有nameserver都发送消息。但是生产者和消费者只是跟某一台nameserver保持联系。 设定一个场景, 如果某个broker的topic配置发生了变化,它向所有nameserver发布通知,但是此时如果某一台nameserver推送失败(超时或者挂掉了),则nameserver集群之间的信息是不完整的,因为挂掉的那台nameserver没有得到最新变化。
由此衍生三个问题:
1 如果该nameserver不是挂掉,只是那一瞬间没有响应,那么待可正常服务时,刚才那个borker发生的变化应该能生效,不应该被丢弃,否则nameserver之间的数据是不同步的。
  解决方案:broker是定时向所有nameserver发送自己的注册信息的,如果当时某台nameserver挂掉重启或者超时,没关系,下次仍然会接受到上次没接收到的broker信息
2 如果真的挂掉了,但是很快又恢复了,因为borker和nameserver保持的是长连接,显然挂掉重新启动后,broker与nameserver的长连接无效了,应该能自动重连
  见 getAndCreateChannel方法分析
3 只要某个nameserver不可用,消费者应该能failover,每次应该都检查长连接是否还有效,若无效则 自动连接其他nameserver。
  见 getAndCreateNameserverChannel()方法分析
 
带着这个疑问,看看 this  . mQClientAPIImpl  .getTopicRouteInfoFromNameServer(topic, 1000 * 3)方法。 这个方法向nameserver发起调用,获取路由结果
RemotingCommand request =  RemotingCommand.createRequestCommand(RequestCode.  GET_ALL_TOPIC_LIST_FROM_NAMESERVER  ,  null );
RemotingCommand response =  this  . remotingClient  .invokeSync(  null , request, timeoutMillis);
重点在于 remotingClient  .invokeSync方法,如下
@Override
     public  RemotingCommand invokeSync(String addr,  final  RemotingCommand request,  long  timeoutMillis)
             throws  InterruptedException, RemotingConnectException, RemotingSendRequestException,
            RemotingTimeoutException {
        //这里获取连接,该方法里面会做连接的检查和恢复
         final  Channel channel =  this  .getAndCreateChannel(addr);
 
        //最后如果还是不是有效连接,则关闭连接,抛出异常
         if  (channel !=  null  && channel.isActive()) {
             try  {
                 if  ( this  . rpcHook  !=  null ) {
                     this  . rpcHook  .doBeforeRequest(addr, request);
                }
                RemotingCommand response =  this  .invokeSyncImpl(channel, request, timeoutMillis);
                 if  ( this  . rpcHook  !=  null ) {
                     this  . rpcHook  .doAfterResponse(request, response);
                }
                 return  response;
            }
             catch  (RemotingSendRequestException e) {
                 log  .warn( "invokeSync: send request exception, so close the channel[{}]" , addr);
                 this  .closeChannel(addr, channel);
                 throw  e;
            }
             catch  (RemotingTimeoutException e) {
                 log  .warn( "invokeSync: wait response timeout exception, the channel[{}]" , addr);
                 // 超时异常如果关闭连接可能会产生连锁反应
                 // this.closeChannel( addr, channel);
                 throw  e;
            }
        }
         else  {
             this  .closeChannel(addr, channel);
             throw   new  RemotingConnectException(addr);
        }
    }

这个方法大体分为两步,第一步获取连接,第二步通过连接发送请求,获取连接当然是 getAndCreateChannel方法了, getAndCreateChannel方法非常重要,它包含了客户端对nameserver的failover,也包含了自动重连功能, 对于客户端,传入的addr参数都是null,所以一直会走到 getAndCreateNameserverChannel()方法。

    private  Channel  getAndCreateChannel (  final  String addr)  throws  InterruptedException {
        //无论是producer还是consumer,传进来的 addr参数都是null
         if  ( null  == addr)
             return  getAndCreateNameserverChannel();
 
        //因为客户端传入的addr是null,所以客户端不会走到这里来,只有broker才会走到这里来,因为broker传入的addr不为null
        ChannelWrapper cw =  this  . channelTables  .get(addr);
         if  (cw !=  null  && cw.isOK()) {
             return  cw.getChannel();
        }
 
        //注意,如果和某个addr的连接不OK了,则再向该nameserver发起重连
         return   this  .createChannel(addr);
    }
 
createChannel方法很简单,无非就是创建连接嘛,就不细看了,分析下 getAndCreateNameserverChannel(),以下是该方法大致过程:

因为客户端都是与某一台nameserver长连接,因此长连接一旦选定,后面不会变化,除非nameserver挂掉,所以已建立的长连接要保存起来。下面这段逻辑就是如此。
       String addr =  this  . namesrvAddrChoosed  .get();
         if  (addr !=  null ) {
            ChannelWrapper cw =  this  . channelTables  .get(addr);
             //注意这里,虽然长连接已经建立了,但是每次调用时,仍然要通过“ cw != null && cw.isOK()”检查连接是否OK。
              if  (cw !=  null  && cw.isOK()) {
                 return  cw.getChannel();
            }
        }
如果连接没有建立或连接已经断开,则继续往下,真正创建连接时需要加锁的
  if ( this . lockNamesrvChannel .tryLock( LockTimeoutMillis , TimeUnit. MILLISECONDS ))
下面的代码都是在这个if块里面
这里又执行了一边上面的获取连接并检测的代码,可以连接,因为有时候连接只是偶尔不OK的
     addr =  this .  namesrvAddrChoosed  .get();
                 if  (addr !=  null ) {
                    ChannelWrapper cw =  this  . channelTables  .get(addr);
                     if  (cw !=  null  && cw.isOK()) {
                         return  cw.getChannel();
                    }
                }
接着往下, 这段代码非常重要

namesrvIndex指示了当前跟哪个nameserver发生连接,初始值是个随机数,跟nameserver数量取模,走到这一步,要么是首次发起调用,之前连接还未创建现在要创建了,或者是已创建的连接无效了要连接下一个nameserver,就是“cw.isOK()”为false。
 

         if  (addrList !=  null  && !addrList.isEmpty()) {
                     for  ( int  i = 0; i < addrList.size(); i++) {
                         int  index =  this  . namesrvIndex  .incrementAndGet();
                        index = Math. abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);
 
                         this  .namesrvAddrChoosed.set(newAddr);
                        Channel channelNew =  this  .createChannel(newAddr);
                         if  (channelNew !=  null )
                             return  channelNew;
                    }
                }

转载于:https://my.oschina.net/boltwu/blog/473025

RocketMQ topic路由相关推荐

  1. RocketMQ Topic/Group/Tags介绍

    Topic 功能介绍 Topic是RocketMQ里对消息的一级归类. RocketMQ通过Topic完成消息的发布和订阅.消息生产者将消息发送到Topic中,而消息消费者则通过订阅该Topic来消费 ...

  2. RocketMQ 消息路由解析——图解、源码级解析

  3. RocketMQ:NameServer路由管理源码分析

    文章目录 NameServer路由管理 1.前言 2.路由元信息 3.路由注册 3.1Broker路由注册 3.2NameServer处理路由注册 3.3路由删除 3.3.1Broker异常关闭 3. ...

  4. RocketMQ特性、专业术语(Producer,Producer Group,Consumer Group,Topic,Message,Tag,Broker,Name Server)等

    RocketMQ 是什么 Github 上关于 RocketMQ 的介绍:RcoketMQ 是一款低延迟.高可靠.可伸缩.易于使用的消息中间件.具有以下特性: 支持发布/订阅(Pub/Sub)和点对点 ...

  5. activemq后台管理 看topic消息_17 个方面,综合对比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列...

    作者:28cm不含头(来自:知乎) 原文链接: https://www.zhihu.com/question/43557507 一.资料文档 二.开发语言 三.支持的协议 四.消息存储 五.消息事务 ...

  6. RocketMQ自动创建topic原理-TBW102

    自动创建Topic原理介绍   RocketMQ在发送消息的时候,我们一般会先去Broker创建Topic信息,Producer在发送消息的时候会先去nameSrv拉取Topic信息,那么如果拉取不到 ...

  7. RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic

    此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...

  8. RocketMQ—Producer(二)路由动态更新

    一.Producer路由信息 从NameServer章节分析得知,路由信息存储在NameServer,生产端和消费端定时向NameServer获取topic相关的路由信息: 从生产者启动流程得知: 路 ...

  9. RocketMQ快速入门之手动创建topic

    package cn.learn.rocketmq.topic;import org.apache.rocketmq.client.producer.DefaultMQProducer;public ...

最新文章

  1. 【算法与数据结构】在n个数中取第k大的数(基础篇)
  2. 终于看腻了黄色!让它五彩斑斓起来!
  3. IsWindow,findwindow
  4. 【渝粤教育】国家开放大学2018年秋季 0314-21T兽医基础 参考试题
  5. mysql 从服务器同步设置_mysql主从同步配置
  6. Xpath语法学习记录
  7. 阿里腾讯前端一面小结
  8. 算法导论 资源 课后答案 PDF
  9. 《七哥说道》第八章:约在410,北漂可还行?
  10. Tomcat使用总结
  11. zuiqingchun4
  12. img实现图片加载前默认图片,加载时替换真实图片,加载失败时替换加载失败图片
  13. spring @annotation 注解
  14. 微信小程序wepy框架快速入门
  15. 常见的网络连接设备有哪些?
  16. WPF 触发器Triggers
  17. 多层神经元感知器模型_使用多层感知器模型对星系进行分类
  18. AV1代码学习:av1_foreach_transformed_block_in_plane函数
  19. 给你的热图挑选一个合适的渐变色
  20. 蓝牙nrf52832的架构和开发

热门文章

  1. python字符复制函数是啥_Python最全的字符和字符串函数,直接复制到IDLE或另存为py可以运行...
  2. oracle 9i sql_id,Oracle9i增添 wm_concat函数(转)
  3. java和opencv配置_Java——OpenCVWindows配置和项目中jar包的简单配置
  4. java swing事件监听_Java swing(awt):事件监听机制的实现原理+简单示例
  5. java 解析csv 乱码_Java采用opencsv解析csv文件以及解析中文乱码问题
  6. UML工具:EA(Enterprise Architect)
  7. 浙大网新实训项目介绍
  8. [JS] 修改Navigator对象
  9. Python Demo 02 蒙特卡罗方法输出e
  10. elasticsearch api中的Buckets(桶)及Metrics(指标)