这是一篇让你受益匪浅的文章,代码即使人生。

worker启动比server启动要复杂一些,毕竟worker是要实际干活的,工欲善其事必先利其器,所以需要准备的工具还是不能少的,server对于powerjob来说,只是一个调度用的,说白了就是管理worker做什么工作的,只需要给他一个流程,让他按照流程上的内容,一次告诉worker去工作,至于怎么做,只有worker知道,server当然不会知道的,也没有必要知道。

worker的启动大概分为以下这么几个步骤:

  1. 判断是否重复初始化

  2. 获取默认配置

  3. 校验appName

  4. 获取IP地址和端口(这一步和server端是一样的,在这里就不赘述了)

  5. 初始化定时线程

  6. 连接server

  7. 初始化Akka

  8. 初始化日志系统

  9. 初始化存储

  10. 初始化定时任务

步骤是蛮多的,但是其实都不是非常的复杂

由于worker的启动源码过于多了,就不全贴出来了。

开胃菜

首先因为该worker包是需要被依赖的,所以并没有spring的启动类,但是却有启动spring时添加其配置的内容,在worker包里面的PowerJobWorker类,是实现了ApplicationContextAware, InitializingBean, DisposableBean这三个类,这三个类默认有三个方法,分别是

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException

public void afterPropertiesSet() throws Exception

public void destroy() throws Exception

99.9%的初始化工作都是在afterPropertiesSet这个方法里完成的,看名字大概也能猜出这个方法的意思,就是字面意思。

判断是否重复初始化

if (!initialized.compareAndSet(false, true)) {log.warn("[PowerJobWorker] please do not repeat the initialization");return;
}

这段代码意思就是一个initialized的变量,代表的意思是是否初始化,一开始的时候是false,因为还没有开始初始化,然后compareAndSet后面跟着两个参数,第一个参数是预期值,如果预期值和当前的变量值一样,则将当前变量更新为第二个参数的值。

如果initialized的值是false ,和预期值一样,则compareAndSet方法返回的是true,跳出if条件语句,并且initialized值变成了true。

如果initialized的·值是true,和预期值不一样,则compareAndSet返回的是false,进入条件语句,打印告警日志,并且不再有后续的初始化操作,此时initialized的值不变,依旧是true。

获取默认配置

PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();//下面这些代码都是在之后的初始化操作中进行赋值的
workerRuntime.setWorkerAddress(workerAddress);workerRuntime.setServerDiscoveryService(serverDiscoveryService);workerRuntime.setActorSystem(actorSystem);workerRuntime.setOmsLogHandler(omsLogHandler);workerRuntime.setTaskPersistenceService(taskPersistenceService);

这个WorkerRuntime类是worker.common包里面的一个Bean类,记录了一些worker运行时的参数和环境,里面有的有默认值,有的没有默认值,需要在初始化的时候进行赋值。比如上面代码中,后面set的值

校验appName

我将里面有关打印日志的部分全部拿掉了,通过appName,去server请求appId,如果请求不到,则说明配置文件里面的“powerjob.worker.app-name”配置的有问题,所有appName都是需要注册的,所以名字是不会重复的。

private void assertAppName() {//获取到appNamePowerJobWorkerConfig config = workerRuntime.getWorkerConfig();String appName = config.getAppName();Objects.requireNonNull(appName, "appName can't be empty!");//调用server端的服务String url = "http://%s/server/assert?appName=%s";for (String server : config.getServerAddress()) {
//获取到server的请求地址String realUrl = String.format(url, server, appName);try {
//请求服务,返回结果String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
//解析返回结果ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);if (resultDTO.isSuccess()) {
//将appId设置到运行环境里Long appId = Long.valueOf(resultDTO.getData().toString());workerRuntime.setAppId(appId);return;}else {throw new PowerJobException(resultDTO.getMessage());}}catch (PowerJobException oe) {throw oe;}catch (Exception ignore) {}}throw new PowerJobException("no server available!");
}

连接Server

serverDiscoveryService.start(timingPool);

最主要的就是上面这一行代码,这个代码里面主要流程如下:

  1. 将配置文件里面的服务器地址存入内存。

  2. 当前服务地址如果不为空,调用server端的/acquire服务获取结果。

  3. 如果经过第二步没有结果返回,则遍历配置文件中所有的server地址来获取结果。

  4. 如果依旧没有结果,说明连接不到server,需要将所有的本地任务停止。

  5. 如果得到结果,则将结果返回。

    private String discovery() {
    //1.将配置文件里面的服务器地址存入内存。if (ip2Address.isEmpty()) {config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));}String result = null;
    //2.当前服务地址如果不为空,调用server端的/acquire服务获取结果。String currentServer = currentServerAddress;if (!StringUtils.isEmpty(currentServer)) {String ip = currentServer.split(":")[0];String firstServerAddress = ip2Address.get(ip);if (firstServerAddress != null) {result = acquire(firstServerAddress);}}
    //3.如果经过第二步没有结果返回,则遍历配置文件中所有的server地址来获取结果。for (String httpServerAddress : config.getServerAddress()) {if (StringUtils.isEmpty(result)) {result = acquire(httpServerAddress);}else {break;}}if (StringUtils.isEmpty(result)) {
    //4.如果依旧没有结果,说明连接不到server,需要将所有的本地任务停止。if (FAILED_COUNT++ > MAX_FAILED_COUNT) {List<Long> frequentInstanceIds = TaskTrackerPool.getAllFrequentTaskTrackerKeys();if (!CollectionUtils.isEmpty(frequentInstanceIds)) {frequentInstanceIds.forEach(instanceId -> {TaskTracker taskTracker = TaskTrackerPool.remove(instanceId);taskTracker.destroy();});}FAILED_COUNT = 0;}return null;} else {
    //5.如果得到结果,则将结果返回。FAILED_COUNT = 0;return result;}
    }

    初始化日志系统

    OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, actorSystem, serverDiscoveryService);

    这个日志系统的主要作用,就是将本地的日志上报的server上,从传进的参数就能看出,都是和通讯相关的内容。

    这个日志系统的提交也是异步单独占用一个线程,在之前开启的线程中,其中就有一个线程是用来提交日志的,该线程会在worker启动的最后开启,代码段如下:

    timingPool.scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);

    固定每5秒提交一次日志。

    初始化存储

    worker使用的是本地的H2数据库,持久化的策略分为磁盘和内存,在worker停止的时候,会将本地的数据文件全部销毁。其主要的初始化代码在worker.persistence包里面的ConnectionFactory类中,源代码如下:

  6. private final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH);
    private final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH);public synchronized void initDatasource(StoreStrategy strategy) {strategy = strategy == null ? StoreStrategy.DISK : strategy;HikariConfig config = new HikariConfig();config.setDriverClassName(Driver.class.getName());config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL);config.setAutoCommit(true);// 池中最小空闲连接数量config.setMinimumIdle(2);// 池中最大连接数量config.setMaximumPoolSize(32);dataSource = new HikariDataSource(config);try {FileUtils.forceDeleteOnExit(new File(H2_PATH));}catch (Exception ignore) {}
    }

    HikariCP 是一个高性能的 JDBC 连接池组件,HikariConfig 就是其相关的配置类。

    总结

  7. worker工作起来确实不是很容易,需要找到自己的上级,还需要记录自己工作的日志,需要一个人干好多任务,还需要再不耽误正常任务的同时,向自己的上级汇报工作,汇报自己的身体状态。简直就是我们底层程序员的真实写照啊。里面使用了很多经典的技术,也有比较新的技术,对于日志系统,做的还是让我学到了很多。

powerjob的worker启动,研究完了这块代码之后我发现了,代码就是现实中我们码农的真实写照相关推荐

  1. 用 VR 检查代码,码农们的必备神器!

    点击上方"CSDN",选择"置顶公众号" 关键时刻,第一时间送达! "跳进代码库,感觉自己就是宇宙的中心."这是近期网友@沈寒爸的一句评论, ...

  2. 【代码质量】码农提升代码质量的三重境界

    前言:文章的开头,我要先讲个段子(真实案例),玩过拳皇的同学应该对下面这张图不陌生(一不小心暴露年龄了): 本以为这种也就是作为笑料,有意为之,然而就在我工作的第三年,确实碰到了这样的代码,老夫掰起脚 ...

  3. Spark Worker启动源码

    worker的启动流程和master的启动流程相同,区别就是在worker启动完成后,要向Master注册.汇报资源.在Worker的onStart()方法中会调用registerWithMaster ...

  4. 计算机蓝屏无法启动代码50,电脑蓝屏代码0x0000007b无法正常启动的解决方法

    电脑出现蓝屏是十分常见的故障之一,而在蓝屏的时候会提示蓝屏代码,不同的蓝屏代码代表不同的意义,让我们快速的了解电脑是什么问题.那么遇到电脑蓝屏代码0x0000007b怎么办?下面装机之家分享一下电脑蓝 ...

  5. 【SemiDrive源码分析】【X9芯片启动流程】26 - R5 SafetyOS 之 LK_INIT_LEVEL_TARGET 阶段代码流程分析(TP Drvier、Audio Server初始化)

    [SemiDrive源码分析][X9芯片启动流程]26 - R5 SafetyOS 之 LK_INIT_LEVEL_TARGET 阶段代码流程分析(TP Drvier.Audio Server初始化) ...

  6. 美国总统奥巴马不仅呼吁所有人都学习编程,甚至以身作则编写代码,成为美国历史上首位编写计算机代码的总统。2014年底,为庆祝“计算机科学教育周”正式启动,奥巴马编写了很简单的计算机代码:在屏幕上画一个正

    美国总统奥巴马不仅呼吁所有人都学习编程,甚至以身作则编写代码,成为美国历史上首位编写计算机代码的总统.2014年底,为庆祝"计算机科学教育周"正式启动,奥巴马编写了很简单的计算机代 ...

  7. 老码农冒死揭开行业黑幕:如何编写无法维护的代码

    [程序员的那些事 注]:这是一篇非常经典的文章,我们以前发过多次.虽然部分内容是针对 Java 语言,但其他部分对所有编程语言都有参考意义.今天重新推荐给新读者朋友,老朋友也值得重温. 编译:我们专栏 ...

  8. 什么样的代码是好代码_什么是好代码?

    什么样的代码是好代码 编码最佳实践 (Coding Best-Practices) In the following section, I will introduce the topic at ha ...

  9. 如何编写无法维护的代码(现实中有的程序员就是这么干的)

    让自己稳拿铁饭碗 ;-) – Roedy Green(翻译版略有删节) http://blog.jobbole.com/80241/ 简介 永远不要(把自己遇到的问题)归因于(他人的)恶意,这恰恰说明 ...

最新文章

  1. Gradle For Android
  2. 数据库之字段数据类型
  3. 【错误记录】SeeMusic 内存错误 ( 内存占用率 100 % | 清除系统设置信息 )
  4. Java 接口实现计算器加减乘除(字符交互界面)
  5. Java 程序中的多线程
  6. 《解释的工具:生活中的经济学原理 读书笔记6》
  7. Django 框架篇: 一. Django介绍; 二. 安装; 三. 创建项目;
  8. 决策树数学原理(ID3,c4.5,cart算法)
  9. int最大值java_Java 中一个int型数组的求最大值最小值 | 学步园
  10. SpringBoot精通系列-使用Mybatis Generator生成Dao层代码
  11. c语言返回二叉树的大小,C语言中计算二叉树的宽度的两种方式
  12. .h文件包含后显示类型_C语言中#include只能包含.h文件吗?
  13. 瑞信:区块链技术还在半山腰 2025年才能真正成熟
  14. 研发体系核心代码和文档安全保护方案
  15. linux 查看链接文件,Linux下的链接文件详解
  16. 【日常点滴013】python雪球网沪深港美股情爬取
  17. 微信分享网页 图标不显示
  18. WPS文档:格式显示,页码标注,公式居中编号右对齐,公式编号不能在行中间显示
  19. idea 占用内存优化调整
  20. C#通用类库整理--字符串处理类

热门文章

  1. 五一玩转辽宁之二 本溪野炊自助行
  2. 智驾域的时间同步方案(PPS+PPT)
  3. padavan搭建网页认证服务器,搭建 LNMP 的 Wifidog 网页认证服务 教程 感谢 elinpr0s 提供...
  4. css nodeType
  5. 女人 让自己越吃越漂亮(图)
  6. C++实现99乘法表
  7. 组合学笔记(一)偏序集概念与应用
  8. 东芝高管:SSD硬盘永远无法取代机械硬盘!你怎么看?
  9. 设计模式(五) —— 行为型模式(中)
  10. linux 脚本 “=~” 表达 意思