powerjob的worker启动,研究完了这块代码之后我发现了,代码就是现实中我们码农的真实写照
这是一篇让你受益匪浅的文章,代码即使人生。
worker启动比server启动要复杂一些,毕竟worker是要实际干活的,工欲善其事必先利其器,所以需要准备的工具还是不能少的,server对于powerjob来说,只是一个调度用的,说白了就是管理worker做什么工作的,只需要给他一个流程,让他按照流程上的内容,一次告诉worker去工作,至于怎么做,只有worker知道,server当然不会知道的,也没有必要知道。
worker的启动大概分为以下这么几个步骤:
判断是否重复初始化
获取默认配置
校验appName
获取IP地址和端口(这一步和server端是一样的,在这里就不赘述了)
初始化定时线程
连接server
初始化Akka
初始化日志系统
初始化存储
初始化定时任务
步骤是蛮多的,但是其实都不是非常的复杂
由于worker的启动源码过于多了,就不全贴出来了。
开胃菜
首先因为该worker包是需要被依赖的,所以并没有spring的启动类,但是却有启动spring时添加其配置的内容,在worker包里面的PowerJobWorker类,是实现了ApplicationContextAware, InitializingBean, DisposableBean这三个类,这三个类默认有三个方法,分别是
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
public void afterPropertiesSet() throws Exception
public void destroy() throws Exception
99.9%的初始化工作都是在afterPropertiesSet这个方法里完成的,看名字大概也能猜出这个方法的意思,就是字面意思。
判断是否重复初始化
if (!initialized.compareAndSet(false, true)) {log.warn("[PowerJobWorker] please do not repeat the initialization");return;
}
这段代码意思就是一个initialized的变量,代表的意思是是否初始化,一开始的时候是false,因为还没有开始初始化,然后compareAndSet后面跟着两个参数,第一个参数是预期值,如果预期值和当前的变量值一样,则将当前变量更新为第二个参数的值。
如果initialized的值是false ,和预期值一样,则compareAndSet方法返回的是true,跳出if条件语句,并且initialized值变成了true。
如果initialized的·值是true,和预期值不一样,则compareAndSet返回的是false,进入条件语句,打印告警日志,并且不再有后续的初始化操作,此时initialized的值不变,依旧是true。
获取默认配置
PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();//下面这些代码都是在之后的初始化操作中进行赋值的
workerRuntime.setWorkerAddress(workerAddress);workerRuntime.setServerDiscoveryService(serverDiscoveryService);workerRuntime.setActorSystem(actorSystem);workerRuntime.setOmsLogHandler(omsLogHandler);workerRuntime.setTaskPersistenceService(taskPersistenceService);
这个WorkerRuntime类是worker.common包里面的一个Bean类,记录了一些worker运行时的参数和环境,里面有的有默认值,有的没有默认值,需要在初始化的时候进行赋值。比如上面代码中,后面set的值
校验appName
我将里面有关打印日志的部分全部拿掉了,通过appName,去server请求appId,如果请求不到,则说明配置文件里面的“powerjob.worker.app-name”配置的有问题,所有appName都是需要注册的,所以名字是不会重复的。
private void assertAppName() {//获取到appNamePowerJobWorkerConfig config = workerRuntime.getWorkerConfig();String appName = config.getAppName();Objects.requireNonNull(appName, "appName can't be empty!");//调用server端的服务String url = "http://%s/server/assert?appName=%s";for (String server : config.getServerAddress()) {
//获取到server的请求地址String realUrl = String.format(url, server, appName);try {
//请求服务,返回结果String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
//解析返回结果ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);if (resultDTO.isSuccess()) {
//将appId设置到运行环境里Long appId = Long.valueOf(resultDTO.getData().toString());workerRuntime.setAppId(appId);return;}else {throw new PowerJobException(resultDTO.getMessage());}}catch (PowerJobException oe) {throw oe;}catch (Exception ignore) {}}throw new PowerJobException("no server available!");
}
连接Server
serverDiscoveryService.start(timingPool);
最主要的就是上面这一行代码,这个代码里面主要流程如下:
将配置文件里面的服务器地址存入内存。
当前服务地址如果不为空,调用server端的/acquire服务获取结果。
如果经过第二步没有结果返回,则遍历配置文件中所有的server地址来获取结果。
如果依旧没有结果,说明连接不到server,需要将所有的本地任务停止。
如果得到结果,则将结果返回。
private String discovery() { //1.将配置文件里面的服务器地址存入内存。if (ip2Address.isEmpty()) {config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));}String result = null; //2.当前服务地址如果不为空,调用server端的/acquire服务获取结果。String currentServer = currentServerAddress;if (!StringUtils.isEmpty(currentServer)) {String ip = currentServer.split(":")[0];String firstServerAddress = ip2Address.get(ip);if (firstServerAddress != null) {result = acquire(firstServerAddress);}} //3.如果经过第二步没有结果返回,则遍历配置文件中所有的server地址来获取结果。for (String httpServerAddress : config.getServerAddress()) {if (StringUtils.isEmpty(result)) {result = acquire(httpServerAddress);}else {break;}}if (StringUtils.isEmpty(result)) { //4.如果依旧没有结果,说明连接不到server,需要将所有的本地任务停止。if (FAILED_COUNT++ > MAX_FAILED_COUNT) {List<Long> frequentInstanceIds = TaskTrackerPool.getAllFrequentTaskTrackerKeys();if (!CollectionUtils.isEmpty(frequentInstanceIds)) {frequentInstanceIds.forEach(instanceId -> {TaskTracker taskTracker = TaskTrackerPool.remove(instanceId);taskTracker.destroy();});}FAILED_COUNT = 0;}return null;} else { //5.如果得到结果,则将结果返回。FAILED_COUNT = 0;return result;} }
初始化日志系统
OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, actorSystem, serverDiscoveryService);
这个日志系统的主要作用,就是将本地的日志上报的server上,从传进的参数就能看出,都是和通讯相关的内容。
这个日志系统的提交也是异步单独占用一个线程,在之前开启的线程中,其中就有一个线程是用来提交日志的,该线程会在worker启动的最后开启,代码段如下:
timingPool.scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);
固定每5秒提交一次日志。
初始化存储
worker使用的是本地的H2数据库,持久化的策略分为磁盘和内存,在worker停止的时候,会将本地的数据文件全部销毁。其主要的初始化代码在worker.persistence包里面的ConnectionFactory类中,源代码如下:
private final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH); private final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH);public synchronized void initDatasource(StoreStrategy strategy) {strategy = strategy == null ? StoreStrategy.DISK : strategy;HikariConfig config = new HikariConfig();config.setDriverClassName(Driver.class.getName());config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL);config.setAutoCommit(true);// 池中最小空闲连接数量config.setMinimumIdle(2);// 池中最大连接数量config.setMaximumPoolSize(32);dataSource = new HikariDataSource(config);try {FileUtils.forceDeleteOnExit(new File(H2_PATH));}catch (Exception ignore) {} }
HikariCP 是一个高性能的 JDBC 连接池组件,HikariConfig 就是其相关的配置类。
总结
worker工作起来确实不是很容易,需要找到自己的上级,还需要记录自己工作的日志,需要一个人干好多任务,还需要再不耽误正常任务的同时,向自己的上级汇报工作,汇报自己的身体状态。简直就是我们底层程序员的真实写照啊。里面使用了很多经典的技术,也有比较新的技术,对于日志系统,做的还是让我学到了很多。
powerjob的worker启动,研究完了这块代码之后我发现了,代码就是现实中我们码农的真实写照相关推荐
- 用 VR 检查代码,码农们的必备神器!
点击上方"CSDN",选择"置顶公众号" 关键时刻,第一时间送达! "跳进代码库,感觉自己就是宇宙的中心."这是近期网友@沈寒爸的一句评论, ...
- 【代码质量】码农提升代码质量的三重境界
前言:文章的开头,我要先讲个段子(真实案例),玩过拳皇的同学应该对下面这张图不陌生(一不小心暴露年龄了): 本以为这种也就是作为笑料,有意为之,然而就在我工作的第三年,确实碰到了这样的代码,老夫掰起脚 ...
- Spark Worker启动源码
worker的启动流程和master的启动流程相同,区别就是在worker启动完成后,要向Master注册.汇报资源.在Worker的onStart()方法中会调用registerWithMaster ...
- 计算机蓝屏无法启动代码50,电脑蓝屏代码0x0000007b无法正常启动的解决方法
电脑出现蓝屏是十分常见的故障之一,而在蓝屏的时候会提示蓝屏代码,不同的蓝屏代码代表不同的意义,让我们快速的了解电脑是什么问题.那么遇到电脑蓝屏代码0x0000007b怎么办?下面装机之家分享一下电脑蓝 ...
- 【SemiDrive源码分析】【X9芯片启动流程】26 - R5 SafetyOS 之 LK_INIT_LEVEL_TARGET 阶段代码流程分析(TP Drvier、Audio Server初始化)
[SemiDrive源码分析][X9芯片启动流程]26 - R5 SafetyOS 之 LK_INIT_LEVEL_TARGET 阶段代码流程分析(TP Drvier.Audio Server初始化) ...
- 美国总统奥巴马不仅呼吁所有人都学习编程,甚至以身作则编写代码,成为美国历史上首位编写计算机代码的总统。2014年底,为庆祝“计算机科学教育周”正式启动,奥巴马编写了很简单的计算机代码:在屏幕上画一个正
美国总统奥巴马不仅呼吁所有人都学习编程,甚至以身作则编写代码,成为美国历史上首位编写计算机代码的总统.2014年底,为庆祝"计算机科学教育周"正式启动,奥巴马编写了很简单的计算机代 ...
- 老码农冒死揭开行业黑幕:如何编写无法维护的代码
[程序员的那些事 注]:这是一篇非常经典的文章,我们以前发过多次.虽然部分内容是针对 Java 语言,但其他部分对所有编程语言都有参考意义.今天重新推荐给新读者朋友,老朋友也值得重温. 编译:我们专栏 ...
- 什么样的代码是好代码_什么是好代码?
什么样的代码是好代码 编码最佳实践 (Coding Best-Practices) In the following section, I will introduce the topic at ha ...
- 如何编写无法维护的代码(现实中有的程序员就是这么干的)
让自己稳拿铁饭碗 ;-) – Roedy Green(翻译版略有删节) http://blog.jobbole.com/80241/ 简介 永远不要(把自己遇到的问题)归因于(他人的)恶意,这恰恰说明 ...
最新文章
- Gradle For Android
- 数据库之字段数据类型
- 【错误记录】SeeMusic 内存错误 ( 内存占用率 100 % | 清除系统设置信息 )
- Java 接口实现计算器加减乘除(字符交互界面)
- Java 程序中的多线程
- 《解释的工具:生活中的经济学原理 读书笔记6》
- Django 框架篇: 一. Django介绍; 二. 安装; 三. 创建项目;
- 决策树数学原理(ID3,c4.5,cart算法)
- int最大值java_Java 中一个int型数组的求最大值最小值 | 学步园
- SpringBoot精通系列-使用Mybatis Generator生成Dao层代码
- c语言返回二叉树的大小,C语言中计算二叉树的宽度的两种方式
- .h文件包含后显示类型_C语言中#include只能包含.h文件吗?
- 瑞信:区块链技术还在半山腰 2025年才能真正成熟
- 研发体系核心代码和文档安全保护方案
- linux 查看链接文件,Linux下的链接文件详解
- 【日常点滴013】python雪球网沪深港美股情爬取
- 微信分享网页 图标不显示
- WPS文档:格式显示,页码标注,公式居中编号右对齐,公式编号不能在行中间显示
- idea 占用内存优化调整
- C#通用类库整理--字符串处理类
热门文章
- 五一玩转辽宁之二 本溪野炊自助行
- 智驾域的时间同步方案(PPS+PPT)
- padavan搭建网页认证服务器,搭建 LNMP 的 Wifidog 网页认证服务 教程 感谢 elinpr0s 提供...
- css nodeType
- 女人 让自己越吃越漂亮(图)
- C++实现99乘法表
- 组合学笔记(一)偏序集概念与应用
- 东芝高管:SSD硬盘永远无法取代机械硬盘!你怎么看?
- 设计模式(五) —— 行为型模式(中)
- linux 脚本 “=~” 表达 意思