rocketmq namesrv 第一章启动过程
大家好,很高兴在这里跟大家分享下rocketmq源码实现,如有不对的地方欢迎指正。
Namesrv顾名思义就是名称服务,是没有状态可横向扩展的服务。废话不多说了,直接贴代码。。
1,入口函数NamesrvStartup.main0
1.1 System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion));
设置版本号属性
1.2 if (null == System.getProperty(NettySystemConfig.SystemPropertySocketSndbufSize)) {
NettySystemConfig.SocketSndbufSize = 2048;
}
如果缓冲区为空 默认为2M
1.3 if (null == System.getProperty(NettySystemConfig.SystemPropertySocketRcvbufSize)) {
NettySystemConfig.SocketRcvbufSize = 1024;
}
如果接受缓冲区为空 默认为1M
1.4 PackageConflictDetect.detectFastjson();
检测包冲突,因为rocketmq 配置都是用的fastJson作为序列化工具,所以最低使用的是1.2.3版本,如果版本过低会抛出异常
1.5 Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine =
ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
解析命令行参数,为用户提供了一个解释命令行的API不懂的可以查查 http://blog.csdn.net/faye0412/article/details/2949753
1.6 final NamesrvConfig namesrvConfig = new NamesrvConfig();
初始化Namesrv 全局配置文件
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
rocketmq home路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv"
+ File.separator + "kvConfig.json";
KV配置持久化地址,具体含义讲到了在给大家说
1.7 final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876); 设置namesrv监听端口
Namesrv Netty服务全局配置,大家都知道rocketmq 底层通信使用的Netty4,下面给出各个参数的意义
private int listenPort = 8888;默认 监听端口
private int serverWorkerThreads = 8;Netty服务工作线程数量
private int serverCallbackExecutorThreads = 0;Netty服务异步回调线程池线程数量
private int serverSelectorThreads = 3;Netty Selector线程数量
private int serverOnewaySemaphoreValue = 256;控制单向的信号量
private int serverAsyncSemaphoreValue = 64;控制异步信号量
private int serverChannelMaxIdleTimeSeconds = 120;服务空闲心跳检测时间间隔 单位秒
private int serverSocketSndBufSize = NettySystemConfig.SocketSndbufSize;Netty发送缓冲区大小
private int serverSocketRcvBufSize = NettySystemConfig.SocketRcvbufSize;Netty接受缓冲区大小
private boolean serverPooledByteBufAllocatorEnable = false;是否使用Netty内存池
1.8 if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
System.out.println("load config properties file OK, " + file);
in.close();
}
}
根据命令行参数,加载外部namesrvConfig及nettyServerConfig全局配置文件
1.9 if (commandLine.hasOption('p')) {
MixAll.printObjectProperties(null, namesrvConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
System.exit(0);
}
打印namesrvConfig及nettyServerConfig配置信息
2.0 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
将命令行指定的propertis文件 解析加载到namesrvConfig配置中
if (null == namesrvConfig.getRocketmqHome()) {
System.out.println("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+ " variable in your environment to match the location of the RocketMQ installation");
System.exit(-2);
}
注意:这里必须要设置rocketMq home路径,因为要加载conf配置文件
2.1 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
final Logger log = LoggerFactory.getLogger(LoggerName.NamesrvLoggerName);
初始化logback日志工厂,rocketMq默认使用logback作为日志输出,不知道的可以去查查
2.2 MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
打印namesrv配置参数
2.3 final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
创建NamesrvController对象,namesrv 主要控制类
this.namesrvConfig = namesrvConfig;设置namesrc配置
this.nettyServerConfig = nettyServerConfig;设置Netty配置
this.kvConfigManager = new KVConfigManager(this);创建KV配置管理 具体用到在解释
this.routeInfoManager = new RouteInfoManager();创建路由信息管理
this.brokerHousekeepingService = new BrokerHousekeepingService(this);创建broker连接事件处理服务
boolean initResult = controller.initialize();初始化分为几步
2.3.1 this.kvConfigManager.load();加载Kv配置文件
String content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
if (content != null) {
KVConfigSerializeWrapper kvConfigSerializeWrapper =
KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
if (null != kvConfigSerializeWrapper) {
this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
log.info("load KV config table OK");
}
}
rocketmq 默认使用json数据格式存储配置
默认路径为:System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>();
如果配置kvConfigSerializeWrapper不为空则放入configTable容器里,
2.3.2 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);初始化通信层
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());设置父类信号量
this.serverBootstrap = new ServerBootstrap();创建 netty 启动类
this.nettyServerConfig = nettyServerConfig;设置netty 配置信息
this.channelEventListener = channelEventListener;设置扩展的一个时间监听
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();获取异步线程数量
if (publicThreadNums <= 0) {
publicThreadNums = 4; 如果小于等于0 默认设置为4
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
创建 处理Callback应答线程池
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r,
String.format("NettyBossSelector_%d", this.threadIndex.incrementAndGet()));
}
});
创建netty eventLoopGroupBoss 默认线程为1
this.eventLoopGroupWorker =
new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerSelector_%d_%d", threadTotal,
this.threadIndex.incrementAndGet()));
}
});
创建 netty eventLoopGroupWorker 默认线程为3
2.3.3 this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_"));
初始化服务端网络请求处理线程池
2.3.4 this.registerProcessor();
注册Name Server网络请求处理
2.3.5 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
NamesrvController初始化时启动线程定时调用RouteInfoManger的scanNotActiveBroker方法来定时清理不 活动的broker(默认两分钟没有向namesrv发送心跳更新BrokerLiveInfo时间戳的),比较BrokerLiveInfo的时间戳, 如果过期关闭channel连接
2.3.6 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
启动线程定时调用printAllPeriodically方法打印KV配置信息包含变化的配置
2.3.7 controller.start(); 调用NamesrvController.start()启动Netty及相关服务
2.3.8 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
nettyServerConfig.getServerWorkerThreads(), //
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
创建默认的事件线程组,用于接收netty事件
2.3.9 ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NioServerSocketChannel.class)使用NIO selector
.option(ChannelOption.SO_BACKLOG, 1024) 输入连接指示(对连接的请求)的最大队列长度被设置为 backlog 参数。如果队列满时收到连接指示,则拒绝该连接
.option(ChannelOption.SO_REUSEADDR, true) 设置了SO_REUSADDR的应用可以避免TCP 的 TIME_WAIT 状态 时间过长无法复用端口
.option(ChannelOption.SO_KEEPALIVE, false) 设置心跳参数 FALSE为不启用参数
.childOption(ChannelOption.TCP_NODELAY, true)开启TCP_NODELAY表示package被忽略size尽快地发送。
.option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) socket发送缓冲区大小
.option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) socket接收缓冲区大小
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) localAddress方法用于绑定服务器地址和端口
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup, 默认事件接收线程组
new NettyEncoder(), netty编码器
new NettyDecoder(), netty×××
new IdleStateHandler(0, 0, nettyServerConfig .getServerChannelMaxIdleTimeSeconds()), 空闲链路状态处理
new NettyConnetManageHandler(), 自实现空闲链路处理
new NettyServerHandler()); netty服务处理
}
});
2.3.10 if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
是否启用推外内存,默认不使用
2.3.11 ChannelFuture sync = this.serverBootstrap.bind().sync();
绑定netty监听端口
2.3.12 if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
启动事件线程
2.3.13 this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
}
catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
每隔1秒扫描下异步调用超时情况
转载于:https://blog.51cto.com/haqiaolong/1631283
rocketmq namesrv 第一章启动过程相关推荐
- cisco路由器启动过程
巧改启动方式修复路由器故障 作者:李夏艳 在Windows操作系统中,管理员在启动时可以按F8键,选择不同的启动方式来修复故障.如以VGA模式进入解决分辨率不匹配的问题等等.其实在思科的路由器中也有类 ...
- 第一章 PX4-Pixhawk-程序编译过程解析
第一章 PX4程序编译过程解析 PX4是一款软硬件开源的项目,目的在于学习和研究.其中也有比较好的编程习惯,大家不妨可以学习一下国外牛人的编程习惯.这个项目是苏黎世联邦理工大学的一个实验室搞出来的.该 ...
- 第一章 PX4程序编译过程解析
版权声明:本文为博主原创文章,未经博主允许不得转载. 第一章 PX4程序编译过程解析 PX4是一款软硬件开源的项目,目的在于学习和研究.其中也有比较好的编程习惯,大家不妨可以学习一下国外牛人的编程习惯 ...
- Kali Linux 秘籍 第一章 安装和启动Kali
第一章 安装和启动Kali 作者:Willie L. Pritchett, David De Smet 译者:飞龙 协议:CC BY-NC-SA 4.0 简介 Kali Linux,简称Kali,是用 ...
- Linux开机启动过程(2):内核启动的第一步
在内核安装代码的第一步 本文是在原文基础上经过本人的修改. 内核启动的第一步 在上一节中我们开始接触到内核启动代码,并且分析了初始化部分,最后我们停在了对main函数(main函数是第一个用C写的函数 ...
- 第一章 Hadoop启动Shell启动脚本分析--基于hadoop-0.20.2-cdh3u1
我的新浪微博:http://weibo.com/freshairbrucewoo. 欢迎大家相互交流,共同提高技术. 第一章 Hadoop启动Shell启动脚本分析 第一节 start-all.sh脚 ...
- TCPIP网络编程第一章踩坑过程 bind() error connect() error
目录 服务端和客户端代码 设备选择 过程 最近在学习TCP/IP网络编程,第一章就卡了好久,特地写这个来记录过程 服务端和客户端代码 hello_client,c #include <stdio ...
- 一篇文章看明白 Android Service 启动过程
Android - Service 启动过程 相关系列 一篇文章看明白 Android 系统启动时都干了什么 一篇文章了解相见恨晚的 Android Binder 进程间通讯机制 一篇文章看明白 An ...
- linux系统的启动过程 5个步骤,第5章 Linux系统启动过程.ppt
<第5章 Linux系统启动过程.ppt>由会员分享,可在线阅读,更多相关<第5章 Linux系统启动过程.ppt(26页珍藏版)>请在人人文库网上搜索. 1.第5章 Linu ...
最新文章
- 【Sql Server】DateBase-事务
- 计算机软件属于输入还是输出,计算机基本输入输出系统是什么意思(基本输入输出系统简介)...
- 信息化道路上,这两家龙头企业做了什么
- 第七周项目一-一般函数(2)
- JAVA-接口和抽象类的区别
- oracle 约束 Oracle 10g学习系列(5)
- gradle docker_带有Gradle的Docker容器分为4个步骤
- react学习(54)--注意传递请求
- linux 的一些实用工具,linux 命令行下的一些实用工具
- 欧莱雅收购AI公司ModiFace,想让自拍照“一键上妆”
- linux日志打印规则,Linux 打印简单日志(一)
- 没学好 Netty ,要凉?
- 微型计算机原理与应用彭楚武,微型计算机原理及其应用
- Xcode8兼容iOS7的解决方法
- 批处理创建隐秘的加密文件夹,是男人就把秘密藏起来
- python中的sheet,Python中的Smartsheet库:模块'smartsheet.sheets'没有属性'sheets'
- Dockerfile构建Springboot镜像
- notepad编译java文件_notepad编译java
- CF71A Way Too Long Words(string简单模拟)
- CEA-861-D infoframe
热门文章
- BSTR与CString之前的转换
- VC MFC列表视图(CListCtrl)控件
- 软考信息系统项目管理师_信息系统综合测试与管理---软考高级之信息系统项目管理师027
- 大数据之-Hadoop3.x_Yarn_公平调度器---大数据之hadoop3.x工作笔记0146
- Netty工作笔记0082---TCP粘包拆包实例演示
- js工作笔记004---加载数据延迟导致的不确定问题的解决_setTimeout和window_onload
- 工作资讯003---甘特图
- 数据库工作笔记009---Centos中导出mysql数据库
- Linux工作笔记030---Centos7.3启动tomcat 输入startup.sh后提示command not found
- java零碎要点---大型软件部署方案,磁盘阵列,raid提升硬盘性能,解决由于集群带来的文件共享问题