datax生产环境启动运行是通过datax.py启动的,如下:

$ python datax.py job/{YOUR_JOB.json}

这篇文章就是打算解读下datax.py这个源码。

我们从main函数开始,沿着程序的执行流程慢慢解读。

if __name__ == "__main__":printCopyright()    //打印版权信息parser = getOptionParser()  //获取参数解析器options, args = parser.parse_args(sys.argv[1:]) //解析参数if options.reader is not None and options.writer is not None://如果解析后,入参的 reader和writer不为空,在从github上构建出一个 json的样例模板generateJobConfigTemplate(options.reader,options.writer)sys.exit(RET_STATE['OK'])if len(args) != 1:parser.print_help()sys.exit(RET_STATE['FAIL'])startCommand = buildStartCommand(options, args) //构建job命令# print startCommandchild_process = subprocess.Popen(startCommand, shell=True)  //启动java子进程执行真正的命令(也就是job进程)register_signal()(stdout, stderr) = child_process.communicate()sys.exit(child_process.returncode)

我代码里加了注释,可以看到步骤都很清晰。一共有四个大步骤:

  • 1.打印datax版权信息
  • 2.获取参数解析器解析参数
  • 3.构建启动命令
  • 4.启动java子进程

下面依次展开这个4个流程详细解读。

打印版权信息

这个简单,省略。

获取参数解析器解析参数

def getOptionParser():usage = "usage: %prog [options] job-url-or-path"parser = OptionParser(usage=usage)prodEnvOptionGroup = OptionGroup(parser, "Product Env Options","Normal user use these options to set jvm parameters, job runtime mode etc. ""Make sure these options can be used in Product Env.")prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",default=DEFAULT_JVM, help="Set jvm parameters if necessary.")prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",help="Set job unique id when running by Distribute/Local Mode.")prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",action="store", default="standalone",help="Set job runtime mode such as: standalone, local, distribute. ""Default mode is standalone.")prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",action="store", dest="params",help='Set job parameter, eg: the source tableName you want to set it by command, ''then you can use like this: -p"-DtableName=your-table-name", ''if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".''Note: you should config in you job tableName with ${tableName}.')prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",action="store", dest="reader",type="string",help='View job config[reader] template, eg: mysqlreader,streamreader')prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",action="store", dest="writer",type="string",help='View job config[writer] template, eg: mysqlwriter,streamwriter')parser.add_option_group(prodEnvOptionGroup)devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options","Developer use these options to trace more details of DataX.")devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",help="Set to remote debug mode.")devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",default="info", help="Set log level such as: debug, info, all etc.")parser.add_option_group(devEnvOptionGroup)return parser

这里使用python内置的OptionParser来构建parser,通过usage我们知道执行命令的姿势是

datax.py [options] job-url-or-path

options就再接下来的代码中通过add_option来添加。比如我们可以用类似如下的方式执行:

python datax.py -r txtReader -w txtFileWriter

注意到上面的示例并没有指定json文件,因为datax会自动从github拉取对应插件的json的模版给我们,这块的处理正是下面这个代码:

if options.reader is not None and options.writer is not None:generateJobConfigTemplate(options.reader,options.writer)

构建启动命令

def buildStartCommand(options, args):commandMap = {}tempJVMCommand = DEFAULT_JVMif options.jvmParameters:tempJVMCommand = tempJVMCommand + " " + options.jvmParametersif options.remoteDebug:tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIGprint 'local ip: ', getLocalIp()if options.loglevel:tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))if options.mode:commandMap["mode"] = options.mode# jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)jobResource = args[0]if not isUrl(jobResource):jobResource = os.path.abspath(jobResource)if jobResource.lower().startswith("file://"):jobResource = jobResource[len("file://"):]jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))if options.params:jobParams = jobParams + " " + options.paramsif options.jobid:commandMap["jobid"] = options.jobidcommandMap["jvm"] = tempJVMCommandcommandMap["params"] = jobParamscommandMap["job"] = jobResourcereturn Template(ENGINE_COMMAND).substitute(**commandMap)

流程如下:

  1. 处理jvm参数
  2. 处理datax参数(要执行的json,日志等)
  3. 组装java命令

可以通过

print startCommand

打印最终的命令看看,就是一个标准的java命令。类似下面这种:

    # java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\xxxx\github\DataX\target\datax\datax/log -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\xxxx\github\DataX\target\datax\datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=D:\xxxx\github\DataX\target\datax\datax -Dlogback.configurationFile=D:\xxxx\github\DataX\target\datax\datax/conf/logback.xml -classpath D:\xxxx\github\DataX\target\datax\datax/lib/*  -Dlog.file.name=x\datax\job\job_json com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job D:\xxxx\github\DataX\target\datax\datax\job\job.json

datax的启动文体datax.py解析相关推荐

  1. 大数据项目之电商数仓DataX、DataX简介、DataX支持的数据源、DataX架构原理、DataX部署

    文章目录 1. DataX简介 1.1 DataX概述 1.2 DataX支持的数据源 2. DataX架构原理 2.1 DataX设计理念 2.2 DataX框架设计 2.3 DataX运行流程 2 ...

  2. 大数据技术之DataX (一)DataX插件开发

    文章目录 一.背景 二.基于java的本地测试datax 2.1 github上下载datax的源代码 2.2 datax代码导入idea 三.docker安装南大通用数据库GBase和GBase 8 ...

  3. 根据名称获得treeview节点_冶金行业首个!中冶赛迪正式启动工业互联网标识解析二级节点建设...

    近日,国家工业和信息化部信息技术发展司公示了"2020年工业互联网创新发展工程-工业互联网标识解析二级节点(特定行业应用服务平台)项目"候选人名称.由中冶赛迪重庆信息技术有限公司牵 ...

  4. [日更-2019.4.8、4.9、4.12、4.13] cm-14.1 Android系统启动过程分析(一)-init进程的启动、rc脚本解析、zygote启动、属性服务...

    2019独角兽企业重金招聘Python工程师标准>>> 声明 前阶段在项目中涉及到了Android系统定制任务,Android系统定制前提要知道Android系统是如何启动的. 本文 ...

  5. YOLOv3 代码详解(2) —— 数据处理 dataset.py解析:输入图片增强、制作模型的每层输出的标签

    前言: yolo系列的论文阅读 论文阅读 || 深度学习之目标检测 重磅出击YOLOv3 论文阅读 || 深度学习之目标检测yolov2 论文阅读 || 深度学习之目标检测yolov1   该篇讲解的 ...

  6. PASCAL VOC的评估代码voc_eval.py解析

    参考  PASCAL VOC的评估代码voc_eval.py解析 - 云+社区 - 腾讯云 目录 1.读检测的结果 2.解析一幅图像中的目标数 3.计算AP 4.VOC的评估 5.进行python评估 ...

  7. 嵌入式linux的u-boot系统启动过程,嵌入式linux操作系统u-boot启动顺序以及代码解析...

    嵌入式linux操作系统u-boot启动顺序以及代码解析 (9页) 本资源提供全文预览,点击全文预览即可全文预览,如果喜欢文档就下载吧,查找使用更方便哦! 9.9 积分 Bootloader/u-bo ...

  8. MMDetection框架的anchor_generators.py解析与船数据解析

    anchor_generators.py解析 import mmcv import numpy as np import torch from torch.nn.modules.utils impor ...

  9. caffe实战之classify.py解析

    本文将对caffe/python下的classify.py代码以及相关的classifier.py和io.py进行解析. 一.classify.py 由最后的if__name__ == '__main ...

最新文章

  1. python interpreter 中没有torch_PyTorch扩展自定义PyThon/C++(CUDA)算子的若干方法总结
  2. Excel双样本T检验之成对检验
  3. flink与flink-client的版本对应
  4. xss 表单劫持(from通用明文记录)
  5. 体验VSTS源代码管理之一
  6. 深入理解HTTP协议
  7. mysql建库权限_mysql数据库用户权限及建库脚本
  8. C#中is vs as 1
  9. 模拟时钟在LCD上的显示
  10. ORA-01502-对应的快速解决办法(索引或这类索引的分区处于不可用状态)
  11. SpringMVC学习指南【笔记4】数据绑定、表单标签库、转换器、格式化、验证器
  12. 计算机网络拓扑图 模板,网络拓扑图绘制.doc
  13. debounce 防抖函数
  14. 软件技术基础(一):绪论
  15. 奇偶页不同页眉页脚设置
  16. 在线约会其实就是网络泡妞,Meexo 反其道而行为你隐姓埋名
  17. C语言中的TRUE和FALSE
  18. 华为android5.1版本,华为P10国行版搭载的EMUI5.1怎么样?
  19. pool win10提示bad_Win10怎么修复出现bad pool header蓝屏的情况?
  20. 转转验机源码带验机报告

热门文章

  1. jqweui表单日期只选年月的问题
  2. PostGIS 3.1.2软件安装详细教程(地图工具篇.8)
  3. matlab处理数字岩心图像,一种用神经网络进行高质量数字岩心图像处理的方法与流程...
  4. ArcGIS10.2安装教程(win11版)
  5. d3007(d3007经过站点和时间)
  6. 贯穿整个AUTOSAR架构的Interface
  7. Ubuntu 下安装 SVN 服务端
  8. Mac App Store下载总是发生错误如何解决?
  9. 用VHDL编写数字时钟
  10. 微信公众平台注册所需资料(服务号和小程序一样)