大家好,我是烤鸭:

上一篇简单介绍和rocketmq,这一篇看下源码之注册中心。

namesrv

先看两个初始化方法
NamesrvController.initialize() 和 NettyRemotingServer.start();

public boolean initialize() {// 加载配置文件this.kvConfigManager.load();// 创建 NettyRemotingServer 并初始化参数this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));// 将刚才的线程池和netty server 绑定this.registerProcessor();// 每隔10s检测最近120s不活跃的broker并移除this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);// 每隔10分钟输出一下配置this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);// 如果想用tls,ssl协议的话,需要证书构造 sslContextif (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}return true;
}

NettyRemotingServer.start()

public void start() {// 用刚才初始化的线程池创建线程this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 构建netty 相关的handler,包含连接、读数据、解码、请求和响应处理prepareSharableHandlers();// 创建netty server,使用初始化的参数和刚才的handler初始化channelServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// 启动channel的监听器,针对channel的连接、关闭、异常、空闲(后面其他的实现都是关闭逻辑)if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 每秒处理过时的响应,如果超时时间+1秒没响应,就移除该请求并手动回调(由于注册中心没有对外发请求,所以没用到,client和server用到了)this.timer.scheduleAtFixedRate(new TimerTask() {从@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}

再看下 NettyClientHandler,对请求和响应指令进行处理

/*** Entry of incoming command processing.** <p>* <strong>Note:</strong>* The incoming remoting command may be* <ul>* <li>An inquiry request from a remote peer component;</li>* <li>A response to a previous request issued by this very participant.</li>* </ul>* </p>** @param ctx Channel handler context.* @param msg incoming remoting command.* @throws Exception if there were any error while processing the incoming command.*/
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND:// 接收请求并处理processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:// 接收响应,维护responseTable(注册中心用不到)processResponseCommand(ctx, cmd);break;default:break;}}
}

由于注册中心没有发起 request,看下 processRequestCommand(接收request)

/*** Process incoming request command issued by remote peer.** @param ctx channel handler context.* @param cmd request command.*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {// request的code在 RequestCode 类维护,包括 发送、拉取等等final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;// 自增计数final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {// ACL鉴权 (client端和broker使用)doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);final RemotingResponseCallback callback = new RemotingResponseCallback() {@Overridepublic void callback(RemotingCommand response) {doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};// 异步 or 同步if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {NettyRequestProcessor processor = pair.getObject1();// 比较重要的地方,单独分析RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};// 系统繁忙,注册中心不会提示这个(broker 刷盘不及时会报这个)if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {// 避免日志打印的太多if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}// 不是单向请求(onewayRPC,线程池满的话,直接返回系统繁忙)if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}
}

我们先看一下 NettyRequestProcessor.processRequest 实现

DefaultRequestProcessor.processRequest

其实看名字就能看出来 注册中心的操作了

public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}switch (request.getCode()) {case RequestCode.PUT_KV_CONFIG:// admin调用,配置添加到 configTable,定期打印return this.putKVConfig(ctx, request);case RequestCode.GET_KV_CONFIG:// admin调用,获取配置return this.getKVConfig(ctx, request);case RequestCode.DELETE_KV_CONFIG:// admin调用,删除配置return this.deleteKVConfig(ctx, request);case RequestCode.QUERY_DATA_VERSION:// broker 获取topic配置return queryBrokerTopicConfig(ctx, request);case RequestCode.REGISTER_BROKER:// 注册broker,版本不同处理逻辑有些不一样(topic配置信息封装不同)Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);} else {return this.registerBroker(ctx, request);}case RequestCode.UNREGISTER_BROKER:// 下线 brokerreturn this.unregisterBroker(ctx, request);case RequestCode.GET_ROUTEINFO_BY_TOPIC:// 根据topic获取路由信息,获取的key是 ORDER_TOPIC_CONFIG+topicidreturn this.getRouteInfoByTopic(ctx, request);case RequestCode.GET_BROKER_CLUSTER_INFO:// 获取broker 集群信息return this.getBrokerClusterInfo(ctx, request);case RequestCode.WIPE_WRITE_PERM_OF_BROKER:// 废除broker的写入权限return this.wipeWritePermOfBroker(ctx, request);case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:// 获取所有的topicreturn getAllTopicListFromNameserver(ctx, request);case RequestCode.DELETE_TOPIC_IN_NAMESRV:// 删除topicreturn deleteTopicInNamesrv(ctx, request);case RequestCode.GET_KVLIST_BY_NAMESPACE:// 根据namespace获取配置return this.getKVListByNamespace(ctx, request);case RequestCode.GET_TOPICS_BY_CLUSTER:// 根据cluster下的broker获取topicreturn this.getTopicsByCluster(ctx, request);case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:// 获取cluster、broker和关联信息return this.getSystemTopicListFromNs(ctx, request);case RequestCode.GET_UNIT_TOPIC_LIST:// 设置unit_mode true && 非重试的时候,这个配置好像没用啊(https://github.com/apache/rocketmq/issues/639)return this.getUnitTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:// 设置unit_mode true(校验消息和心跳的时候),获取topicreturn this.getHasUnitSubTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:// !getUnitTopicList && getHasUnitSubTopicListreturn this.getHasUnitSubUnUnitTopicList(ctx, request);case RequestCode.UPDATE_NAMESRV_CONFIG:// 更新注册中心配置return this.updateConfig(ctx, request);case RequestCode.GET_NAMESRV_CONFIG:// 获取注册中心配置return this.getConfig(ctx, request);default:break;}return null;
}

小结

注册中心的作用:

存了 cluster、broker、topic的信息。

提供了一些接口,可以broker注册和下线,修改配置等。

检测和维护broker是否活跃。

rocketmq 初探(二)相关推荐

  1. rocketmq 初探(三)

    大家好,我是烤鸭: 上一篇介绍了注册中心,这一篇看下broker.基于 rocketmq 4.9 版本. BrokerStartup#BrokerController 按照代码的先后顺序撸源码: Br ...

  2. mysql mgr简介_mysql8.0初探:(二)MySQL Group Replication-MGR集群简介

    mysql8.0初探:(二)MySQL Group Replication-MGR集群简介 发布时间:2020-06-12 23:59:17 来源:51CTO 阅读:49832 作者:arthur37 ...

  3. 525、Java工程师的进阶之路 -【 RocketMQ (二)】 2022.01.06

    目录 1. RocketMQ 设计目的 1.1. 发布/订阅 1.2. 消息优先级 1.3. 消息顺序 1.4. 消息过滤 1.5. 消息持久化 1.6. 消息可靠性 1.7. 消息实时性 1.8. ...

  4. 图像文字识别初探(二)-FAN(Focusing Attention Network)

    图像文字识别初探(一)-CRNN(Convolution Recurrent Neural Network)和DTRN(Deep-text Recurrent Network) 图像文字识别初探(二) ...

  5. 内网渗透初探(二) | 重新学习内网渗透全过程

    一.前言 前面写了一篇内网渗透初探(一),写的不是特别好,然后也是在学习内网渗透相关的东西,就将其整理了一下,加了自己的思路,写好这篇内网渗透初探(二)- 二.环境介绍 专门做了个拓扑图,首先外网打点 ...

  6. rocketmq 初探(一)

    大家好,我是烤鸭: 今天看下rocketmq.这篇主要是简单介绍下 rocketmq以及idea 本地调试 rocketmq. 项目架构 感兴趣的可以下载源码看下. https://github.co ...

  7. CTF密码学题目初探(二)

    CTF密码学题目初探(二) 密码学总结(一) 1.换位加密 2.替换加密 密码学总结(一) 在上一篇文章里写了12种常见的编码,这一篇文章主要总结换位加密和替换加密. 1.换位加密 栅栏密码(Rail ...

  8. 招聘季,面试前知道RocketMQ这二十三点,大厂面试稳了

    基础 1.为什么要使用消息队列呢? 消息队列主要有三大用途,我们拿一个电商系统的下单举例: 解耦:引入消息队列之前,下单完成之后,需要订单服务去调用库存服务减库存,调用营销服务加营销数据--引入消息队 ...

  9. RocketMQ初探(五)之RocketMQ4.2.6集群部署(单Master+双Master+2m+2s+async异步复制)

    以下部署方式结合众多博友的博客,经过自己一步一步实际搭建,如有雷同,侵权行为,请见谅...其中遇到不少的坑,希望能帮到更多的人,现在很少能找到一份完整版4.2.6版本的搭建教程了,如果你有幸遇见,那么 ...

最新文章

  1. 第二十六课.深度强化学习(一)
  2. 笔记68 Redis数据库
  3. 深度学习在情感分析中的应用
  4. springboot整合shiro-关于登出时,redis中缓存没有清理干净的问题
  5. UBUNTU安装 Rabbitvsc可视化版本控制客户端软件
  6. Python 已经饱和?我猜你一定不懂这个技能!
  7. Hibernate二次学习一----------搭建Hibernate
  8. mysql5.6.20开启慢查询日志以及创建索引优化慢查询
  9. 1过程流程图 3 apqp_干货 | APQP过程流程图及最新版全套表格汇总,收藏备用!
  10. python中递归函数的基例_详谈Python基础之内置函数和递归 Python递归和循环的区别...
  11. Wijmo 5 与Breeze 的组合,及与METRONIC 的集成
  12. VMware Funsion 修改vmnet1/vmnet8默认网络地址及DHCP地址
  13. 安装Adobe Reader出错回滚
  14. 最简单的黑客帝国代码雨教程C++
  15. Ribbon界面开发
  16. WIN10 下的erlang + rabbitmq安装,以及遇到的问题
  17. 《矩阵论引论》田振际——状态方程的约当规范形
  18. 【Linux4.1.12源码分析】协议栈gro收包之TCP处理
  19. Pycharm 引入类报错Unresolved reference ‘attempt_load‘
  20. Kafka的offset自定义存储实现

热门文章

  1. [html] iframe在更改了src之后,不出现后退或者前进按钮怎么解决?
  2. [html] 图片上传时实现本地预览功能的原理是什么?
  3. [vue] 请描述下vue的生命周期是什么?
  4. [css] 什么是FOUC?你是如何避免FOUC的?
  5. [css] CSS3中的transition是否可以过渡opacity和display?
  6. 工作81:图片间隙问题
  7. 前端学习(1533):angular简介
  8. 前端学习(1372):构建模块化路由
  9. 前端学习(753):js没有块级作用域
  10. java学习(12):i++和++i的区别