datax的启动文体datax.py解析
datax生产环境启动运行是通过datax.py启动的,如下:
$ python datax.py job/{YOUR_JOB.json}
这篇文章就是打算解读下datax.py这个源码。
我们从main函数开始,沿着程序的执行流程慢慢解读。
if __name__ == "__main__":printCopyright() //打印版权信息parser = getOptionParser() //获取参数解析器options, args = parser.parse_args(sys.argv[1:]) //解析参数if options.reader is not None and options.writer is not None://如果解析后,入参的 reader和writer不为空,在从github上构建出一个 json的样例模板generateJobConfigTemplate(options.reader,options.writer)sys.exit(RET_STATE['OK'])if len(args) != 1:parser.print_help()sys.exit(RET_STATE['FAIL'])startCommand = buildStartCommand(options, args) //构建job命令# print startCommandchild_process = subprocess.Popen(startCommand, shell=True) //启动java子进程执行真正的命令(也就是job进程)register_signal()(stdout, stderr) = child_process.communicate()sys.exit(child_process.returncode)
我代码里加了注释,可以看到步骤都很清晰。一共有四个大步骤:
- 1.打印datax版权信息
- 2.获取参数解析器解析参数
- 3.构建启动命令
- 4.启动java子进程
下面依次展开这个4个流程详细解读。
打印版权信息
这个简单,省略。
获取参数解析器解析参数
def getOptionParser():usage = "usage: %prog [options] job-url-or-path"parser = OptionParser(usage=usage)prodEnvOptionGroup = OptionGroup(parser, "Product Env Options","Normal user use these options to set jvm parameters, job runtime mode etc. ""Make sure these options can be used in Product Env.")prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",default=DEFAULT_JVM, help="Set jvm parameters if necessary.")prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",help="Set job unique id when running by Distribute/Local Mode.")prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",action="store", default="standalone",help="Set job runtime mode such as: standalone, local, distribute. ""Default mode is standalone.")prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",action="store", dest="params",help='Set job parameter, eg: the source tableName you want to set it by command, ''then you can use like this: -p"-DtableName=your-table-name", ''if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".''Note: you should config in you job tableName with ${tableName}.')prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",action="store", dest="reader",type="string",help='View job config[reader] template, eg: mysqlreader,streamreader')prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",action="store", dest="writer",type="string",help='View job config[writer] template, eg: mysqlwriter,streamwriter')parser.add_option_group(prodEnvOptionGroup)devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options","Developer use these options to trace more details of DataX.")devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",help="Set to remote debug mode.")devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",default="info", help="Set log level such as: debug, info, all etc.")parser.add_option_group(devEnvOptionGroup)return parser
这里使用python内置的OptionParser来构建parser,通过usage我们知道执行命令的姿势是
datax.py [options] job-url-or-path
options就再接下来的代码中通过add_option来添加。比如我们可以用类似如下的方式执行:
python datax.py -r txtReader -w txtFileWriter
注意到上面的示例并没有指定json文件,因为datax会自动从github拉取对应插件的json的模版给我们,这块的处理正是下面这个代码:
if options.reader is not None and options.writer is not None:generateJobConfigTemplate(options.reader,options.writer)
构建启动命令
def buildStartCommand(options, args):commandMap = {}tempJVMCommand = DEFAULT_JVMif options.jvmParameters:tempJVMCommand = tempJVMCommand + " " + options.jvmParametersif options.remoteDebug:tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIGprint 'local ip: ', getLocalIp()if options.loglevel:tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))if options.mode:commandMap["mode"] = options.mode# jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)jobResource = args[0]if not isUrl(jobResource):jobResource = os.path.abspath(jobResource)if jobResource.lower().startswith("file://"):jobResource = jobResource[len("file://"):]jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))if options.params:jobParams = jobParams + " " + options.paramsif options.jobid:commandMap["jobid"] = options.jobidcommandMap["jvm"] = tempJVMCommandcommandMap["params"] = jobParamscommandMap["job"] = jobResourcereturn Template(ENGINE_COMMAND).substitute(**commandMap)
流程如下:
- 处理jvm参数
- 处理datax参数(要执行的json,日志等)
- 组装java命令
可以通过
print startCommand
打印最终的命令看看,就是一个标准的java命令。类似下面这种:
# java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\xxxx\github\DataX\target\datax\datax/log -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\xxxx\github\DataX\target\datax\datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=D:\xxxx\github\DataX\target\datax\datax -Dlogback.configurationFile=D:\xxxx\github\DataX\target\datax\datax/conf/logback.xml -classpath D:\xxxx\github\DataX\target\datax\datax/lib/* -Dlog.file.name=x\datax\job\job_json com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job D:\xxxx\github\DataX\target\datax\datax\job\job.json
datax的启动文体datax.py解析相关推荐
- 大数据项目之电商数仓DataX、DataX简介、DataX支持的数据源、DataX架构原理、DataX部署
文章目录 1. DataX简介 1.1 DataX概述 1.2 DataX支持的数据源 2. DataX架构原理 2.1 DataX设计理念 2.2 DataX框架设计 2.3 DataX运行流程 2 ...
- 大数据技术之DataX (一)DataX插件开发
文章目录 一.背景 二.基于java的本地测试datax 2.1 github上下载datax的源代码 2.2 datax代码导入idea 三.docker安装南大通用数据库GBase和GBase 8 ...
- 根据名称获得treeview节点_冶金行业首个!中冶赛迪正式启动工业互联网标识解析二级节点建设...
近日,国家工业和信息化部信息技术发展司公示了"2020年工业互联网创新发展工程-工业互联网标识解析二级节点(特定行业应用服务平台)项目"候选人名称.由中冶赛迪重庆信息技术有限公司牵 ...
- [日更-2019.4.8、4.9、4.12、4.13] cm-14.1 Android系统启动过程分析(一)-init进程的启动、rc脚本解析、zygote启动、属性服务...
2019独角兽企业重金招聘Python工程师标准>>> 声明 前阶段在项目中涉及到了Android系统定制任务,Android系统定制前提要知道Android系统是如何启动的. 本文 ...
- YOLOv3 代码详解(2) —— 数据处理 dataset.py解析:输入图片增强、制作模型的每层输出的标签
前言: yolo系列的论文阅读 论文阅读 || 深度学习之目标检测 重磅出击YOLOv3 论文阅读 || 深度学习之目标检测yolov2 论文阅读 || 深度学习之目标检测yolov1 该篇讲解的 ...
- PASCAL VOC的评估代码voc_eval.py解析
参考 PASCAL VOC的评估代码voc_eval.py解析 - 云+社区 - 腾讯云 目录 1.读检测的结果 2.解析一幅图像中的目标数 3.计算AP 4.VOC的评估 5.进行python评估 ...
- 嵌入式linux的u-boot系统启动过程,嵌入式linux操作系统u-boot启动顺序以及代码解析...
嵌入式linux操作系统u-boot启动顺序以及代码解析 (9页) 本资源提供全文预览,点击全文预览即可全文预览,如果喜欢文档就下载吧,查找使用更方便哦! 9.9 积分 Bootloader/u-bo ...
- MMDetection框架的anchor_generators.py解析与船数据解析
anchor_generators.py解析 import mmcv import numpy as np import torch from torch.nn.modules.utils impor ...
- caffe实战之classify.py解析
本文将对caffe/python下的classify.py代码以及相关的classifier.py和io.py进行解析. 一.classify.py 由最后的if__name__ == '__main ...
最新文章
- python interpreter 中没有torch_PyTorch扩展自定义PyThon/C++(CUDA)算子的若干方法总结
- Excel双样本T检验之成对检验
- flink与flink-client的版本对应
- xss 表单劫持(from通用明文记录)
- 体验VSTS源代码管理之一
- 深入理解HTTP协议
- mysql建库权限_mysql数据库用户权限及建库脚本
- C#中is vs as 1
- 模拟时钟在LCD上的显示
- ORA-01502-对应的快速解决办法(索引或这类索引的分区处于不可用状态)
- SpringMVC学习指南【笔记4】数据绑定、表单标签库、转换器、格式化、验证器
- 计算机网络拓扑图 模板,网络拓扑图绘制.doc
- debounce 防抖函数
- 软件技术基础(一):绪论
- 奇偶页不同页眉页脚设置
- 在线约会其实就是网络泡妞,Meexo 反其道而行为你隐姓埋名
- C语言中的TRUE和FALSE
- 华为android5.1版本,华为P10国行版搭载的EMUI5.1怎么样?
- pool win10提示bad_Win10怎么修复出现bad pool header蓝屏的情况?
- 转转验机源码带验机报告
热门文章
- jqweui表单日期只选年月的问题
- PostGIS 3.1.2软件安装详细教程(地图工具篇.8)
- matlab处理数字岩心图像,一种用神经网络进行高质量数字岩心图像处理的方法与流程...
- ArcGIS10.2安装教程(win11版)
- d3007(d3007经过站点和时间)
- 贯穿整个AUTOSAR架构的Interface
- Ubuntu 下安装 SVN 服务端
- Mac App Store下载总是发生错误如何解决?
- 用VHDL编写数字时钟
- 微信公众平台注册所需资料(服务号和小程序一样)