datax源码解析-JobContainer的初始化阶段解析

写在前面

此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。

我所使用的任务模版的json文件是:

{"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"column":["id","name","age"],"connection":[{"jdbcUrl":["jdbc:mysql://127.0.0.1:3306/test"],"table":["t_datax_test"]}],"password":"11111111","username":"root"}},"writer":{"name":"mysqlwriter","parameter":{"column":["id","name","age"],"connection":[{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test2","table":["t_datax_test"]}],"password":"11111111","username":"root"}}}],"setting":{"speed":{"channel":"2"}}}
}

另外就是,我在阅读源码的时候加入了很多注释,大家看我贴出来的代码要注意看注释哦。贴出来的代码有些可能只是包含核心的片段。

JobContainer初始化阶段

接着上篇文章:

datax源码解析-启动类分析

进入JobContainer的start方法,jobContainer主要负责的工作全部在start()里面,包括:

  • preHandle,前置处理
  • init,初始化,主要是调用插件的init方法实现初始化
  • prepare,准备工作,比如清空目标表。内部还是调用各类型插件的方法来实现准备工作
  • split,根据配置的并发参数,对job进行切分,切分为多个task
  • scheduler,把上一步reader和writer split的结果整合到taskGroupContainer,启动任务
  • post,执行完任务后的后置操作
  • invokeHooks,DataX 的 Hook 机制,比如我们可以实现将datax任务的执行结果发送的邮箱,短信通知等

从代码中看,也可以清晰的看到这几个过程:

public void start() {LOG.info("DataX jobContainer starts job.");boolean hasException = false;boolean isDryRun = false;try {this.startTimeStamp = System.currentTimeMillis();isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);if(isDryRun) {LOG.info("jobContainer starts to do preCheck ...");this.preCheck();} else {//线程安全考虑userConf = configuration.clone();LOG.debug("jobContainer starts to do preHandle ...");初始化preHandler插件并执行插件的preHandlerthis.preHandle();LOG.debug("jobContainer starts to do init ...");//初始化reader和writerthis.init();LOG.info("jobContainer starts to do prepare ...");//准备工作,比如清空目标表。内部还是调用各类型插件的方法来实现准备工作。this.prepare();LOG.info("jobContainer starts to do split ...");//拆分task,实际的拆分工作还是调用插件的实现this.totalStage = this.split();LOG.info("jobContainer starts to do schedule ...");//把上一步reader和writer split的结果整合到taskGroupContainer,启动任务this.schedule();LOG.debug("jobContainer starts to do post ...");//执行任务后的操作this.post();LOG.debug("jobContainer starts to do postHandle ...");//不知道是干啥的this.postHandle();LOG.info("DataX jobId [{}] completed successfully.", this.jobId);//DataX 的 Hook 机制,比如我们可以实现将datax任务的执行结果发送的邮箱,短信通知等this.invokeHooks();...

本篇文章只关注前面三个部分,也就是preHandle,init,prepare三个阶段,我认为这三个阶段都属于任务开始前的初始化阶段。

preHandler

preHandler目前官方也没有实现,com.alibaba.datax.common.plugin.AbstractPlugin#preHandler方法目前是空的,所以这里我们也先略过。

init

继续看init方法,

private void init() {//从配置中获取jobidthis.jobId = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);if (this.jobId < 0) {LOG.info("Set jobId = 0");this.jobId = 0;this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,this.jobId);}Thread.currentThread().setName("job-" + this.jobId);//DataX所有的状态及统计信息交互类,job、taskGroup、task等的消息汇报JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());//必须先Reader ,后Writerthis.jobReader = this.initJobReader(jobPluginCollector);//writer的初始化做的事情会多一些,比如会检查写入表的字段和指定的字段个数是否一致等this.jobWriter = this.initJobWriter(jobPluginCollector);}

可以看到,init方法分别调用的是reader和writer的init方法进行初始化。先来看下initJobReader方法,

private Reader.Job initJobReader(JobPluginCollector jobPluginCollector) {this.readerPluginName = this.configuration.getString(CoreConstant.DATAX_JOB_CONTENT_READER_NAME);classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));//loadJobPlugin需要用到jarLoaderReader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(PluginType.READER, this.readerPluginName);// 设置reader的jobConfigjobReader.setPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));// 设置reader的readerConfigjobReader.setPeerPluginJobConf(this.configuration.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));jobReader.setJobPluginCollector(jobPluginCollector);//调用插件自己内部的init方法进行个性初始化,以mysql的初始化为例//mysql reader会检查username,password等是否存在jobReader.init();classLoaderSwapper.restoreCurrentThreadClassLoader();return jobReader;}

首先看这个方法返回的是Reader.Job这样的一个内部类,这个类是AbstractJobPlugin的一个实现。所以返回的其实是一个reader插件的实例。

接着看到是com.alibaba.datax.core.util.container.LoadUtil#getJarLoader方法,它根据类型和名称从缓存中获取,如果没有则去创建,创建的流程首先获取插件的路径.比如:D:\DataX\target\datax\datax\plugin\reader\mysqlreader,然后根据JarLoader里面的getURLs(paths)获取插件路径下所有的jar包。创建单独的JarLoader,把创建的JarLoader缓存起来。

然后它返回一个是一个自定义的类加载器JarLoader,根据java类加载器的原理我们知道,JarLoader是Application ClassLoader的子类。DataX通过Thread.currentThread().setContextClassLoader在每次对插件调用前后的进行classLoader的切换实现jar隔离的加载机制。

接下里的loadJobPlugin就会用到这个类加载器去实例化插件的实现类。

插件加载这部分的设计还是值得学习的,即实现了jar的隔离加载,也实现了热加载功能。

最后就是调用插件本身的init方法,以mysql为例,这里主要是检查 username/password 配置是否存在等。

writer的初始化流程基本是一样的,这里不展开了。

prepare

prepare也是调用插件的prepare方法进行准备阶段的工作,

private void prepare() {this.prepareJobReader();this.prepareJobWriter();}

mysql reader的prepare没有实现,意味着不需要prepare,我们直接来看下writer的prepare方法。

// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)public void prepare(Configuration originalConfig) {int tableNumber = originalConfig.getInt(Constant.TABLE_NUMBER_MARK);if (tableNumber == 1) {String username = originalConfig.getString(Key.USERNAME);String password = originalConfig.getString(Key.PASSWORD);List<Object> conns = originalConfig.getList(Constant.CONN_MARK,Object.class);Configuration connConf = Configuration.from(conns.get(0).toString());// 这里的 jdbcUrl 已经 append 了合适后缀参数String jdbcUrl = connConf.getString(Key.JDBC_URL);originalConfig.set(Key.JDBC_URL, jdbcUrl);//表名String table = connConf.getList(Key.TABLE, String.class).get(0);originalConfig.set(Key.TABLE, table);//如果有需要提前执行的sql,比如清空表等List<String> preSqls = originalConfig.getList(Key.PRE_SQL,String.class);/*** sql转换,比如把@table换成实际的table name**/List<String> renderedPreSqls = WriterUtil.renderPreOrPostSqls(preSqls, table);originalConfig.remove(Constant.CONN_MARK);if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {// 说明有 preSql 配置,则此处删除掉originalConfig.remove(Key.PRE_SQL);Connection conn = DBUtil.getConnection(dataBaseType,jdbcUrl, username, password);LOG.info("Begin to execute preSqls:[{}]. context info:{}.",StringUtils.join(renderedPreSqls, ";"), jdbcUrl);WriterUtil.executeSqls(conn, renderedPreSqls, jdbcUrl, dataBaseType);DBUtil.closeDBResources(null, null, conn);}}

其实prepare的核心思想就是,看下任务的配置文件有没有需要提前执行的sql,比如清空表之类的,有的话就先执行了。


参考:

  • https://www.cnblogs.com/xmzpc/p/15193622.html

datax源码解析-JobContainer的初始化阶段解析相关推荐

  1. Uboot中start.S源码的指令级的详尽解析

    Uboot中start.S源码的指令级的详尽解析 版本:v1.9 Crifan Li 摘要 本文对Uboot中的Start.S的源码的几乎每一行,都进行了详细的解析 本文提供多种格式供: 在线阅读 H ...

  2. datax源码环境搭建

    文章目录 datax源码环境搭建 写在前面 环境 下载源码并编译 错误1 错误2 运行 生产模式 debug模式 datax源码环境搭建 写在前面 DataX 是阿里巴巴集团内被广泛使用的离线数据同步 ...

  3. Uboot中start.S源码的指令级的详尽解析 在线版

    http://bbs.chinaunix.net/thread-2312785-1-1.html 以后所有内容更新,都放在这里: Uboot中start.S源码的指令级的详尽解析 在线版 Uboot ...

  4. php tire树,Immutable.js源码之List 类型的详细解析(附示例)

    本篇文章给大家带来的内容是关于Immutable.js源码之List 类型的详细解析(附示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 一.存储图解 我以下面这段代码为例子,画 ...

  5. Spring-bean的循环依赖以及解决方式___Spring源码初探--Bean的初始化-循环依赖的解决

    本文主要是分析Spring bean的循环依赖,以及Spring的解决方式. 通过这种解决方式,我们可以应用在我们实际开发项目中. 什么是循环依赖? 怎么检测循环依赖 Spring怎么解决循环依赖 S ...

  6. 【iScroll源码学习01】准备阶段 - 叶小钗

    [iScroll源码学习01]准备阶段 - 叶小钗 时间 2013-12-29 18:41:00 博客园-原创精华区 原文  http://www.cnblogs.com/yexiaochai/p/3 ...

  7. 找一个网页,用浏览器查看源码并复制,然后尝试解析一下HTML,输出Python官网发布的会议时间、名称和地点——python学习笔记

    1. 题目: 找一个网页,例如https://www.python.org/events/python-events/,用浏览器查看源码并复制,然后尝试解析一下HTML,输出Python官网发布的会议 ...

  8. Spring IOC 容器源码分析 - 余下的初始化工作

    1. 简介 本篇文章是"Spring IOC 容器源码分析"系列文章的最后一篇文章,本篇文章所分析的对象是 initializeBean 方法,该方法用于对已完成属性填充的 bea ...

  9. Uboot中start.S源码的指令级的详尽解析(转)

    Uboot中start.S源码的指令级的详尽解析 转载于:https://www.cnblogs.com/LittleTiger/p/10877516.html

最新文章

  1. pycharm代码模板设置
  2. 「Vueconf」探索 Vue3 中 的 JSX
  3. C语言题目地图上有m个城市,序号依次为1,2,3....m,刚开始你在1,若每次只能从当前城市去往当前序号加1或者加3的城市,要到达m城市(m3),有多少种走法
  4. 深度学习计算机视觉的简介_商业用途计算机视觉简介
  5. 两种方式打开jar文件
  6. 从研发效能的视角谈“故障复盘”
  7. 封装PC端使用海康插件播放摄像头直播流(VUE)
  8. 点我—— ASP.NETCORE 安装CentOS
  9. 132,怎样理解帝国和王朝的兴衰
  10. git merge工具 meld
  11. Debian Iptables 配置教程
  12. 小程序webview嵌入h5兼容iphone安全区域
  13. vue中路由传参方式之二(this.$router.push进行编程式路由跳转传参)
  14. oracle direct path read temp,direct path read/read temp等待事件
  15. hive java.net.SocketTimeoutException: Read timed out 问题解
  16. 计蒜客题解——T1769:最大岛屿
  17. WTL_Freecell-Release05-v1.3-20190129 (WTL空当接龙v1.3)
  18. kali linux超级用户_Kali Linux操作系统将从本月发布的新版本开始默认不再使用root账户...
  19. GBU1510-ASEMI电源专用15A整流桥GBU1510
  20. 从新建文件夹开始构建ShadowPlay Engine游戏引擎(4)

热门文章

  1. 用爬虫分析沪深300指数超长走势
  2. 不可思议有氧机器人_不思议迷宫蒸汽之都机器人彩蛋获得攻略
  3. 多进程,多线程 的使用
  4. css3 滚动条样式
  5. 【报告分享】2022中国化妆品行业发展趋势洞察-易观分析(附下载)
  6. Java中字符串CST的时间日期转换
  7. STM32F103C8T6制作舵机测试仪详细图文教程 | 定时器触发ADC | DMA传输 | PWM输出 | RTC实时时钟 | USART串口输出 | OLED IIC显示
  8. 安卓基础知识之Activity篇(一):Activity生命周期
  9. 乖乖兽|3种超神减肥妙招,跟肥胖say拜拜!
  10. python stats画正态分布、指数分布、对数正态分布的QQ图