gpload.py代码写得挺简洁的,主要逻辑集中中gpload类中,其中run函数是最为重要的,而run函数主要逻辑就是调用run2函数。

if __name__ == '__main__':g = gpload(sys.argv[1:])g.run()sys.stdout.flush()sys.stderr.flush()os._exit(g.exitValue)

gpload构造函数

    def __init__(self,argv):self.threads = [] # remember threads so that we can join() against themself.exitValue = 0self.options = options()self.options.h = Noneself.options.gpfdist_timeout = Noneself.options.p = Noneself.options.U = Noneself.options.W = Falseself.options.D = Falseself.options.no_auto_trans = Falseself.options.password = Noneself.options.d = Noneself.DEBUG = 5self.LOG = 4self.INFO = 3self.WARN = 2self.ERROR = 1self.options.qv = self.INFOself.options.l = Noneself.formatOpts = ""self.startTimestamp = time.time()self.error_table = Falseself.gpdb_version = ""seenv = Falseseenq = False# Create Temp and External table names. However external table name could# get overwritten with another name later on (see create_external_table_name).# MPP-20927: gpload external table name problem. We use uuid to avoid# external table name confliction.self.unique_suffix = str(uuid.uuid1()).replace('-', '_')self.staging_table_name = 'temp_staging_gpload_' + self.unique_suffixself.extTableName  = 'ext_gpload_' + self.unique_suffix# SQL to run in order to undo our temporary workself.cleanupSql = []self.distkey = NoneconfigFilename = Nonewhile argv:try:try:if argv[0]=='-h':self.options.h = argv[1]argv = argv[2:]if argv[0]=='--gpfdist_timeout':self.options.gpfdist_timeout = argv[1]argv = argv[2:]elif argv[0]=='-p':self.options.p = int(argv[1])argv = argv[2:]elif argv[0]=='-l':self.options.l = argv[1]argv = argv[2:]elif argv[0]=='-q':self.options.qv -= 1argv = argv[1:]seenq = Trueelif argv[0]=='--version':sys.stderr.write("gpload version $Revision$\n")sys.exit(0)elif argv[0]=='-v':self.options.qv = self.LOGargv = argv[1:]seenv = Trueelif argv[0]=='-V':self.options.qv = self.DEBUGargv = argv[1:]seenv = Trueelif argv[0]=='-W':self.options.W = Trueargv = argv[1:]elif argv[0]=='-D':self.options.D = Trueargv = argv[1:]elif argv[0]=='-U':self.options.U = argv[1]argv = argv[2:]elif argv[0]=='-d':self.options.d = argv[1]argv = argv[2:]elif argv[0]=='-f':configFilename = argv[1]argv = argv[2:]elif argv[0]=='--no_auto_trans':self.options.no_auto_trans = Trueargv = argv[1:]elif argv[0]=='-?':usage()else:breakexcept IndexError:sys.stderr.write("Option %s needs a parameter.\n"%argv[0])sys.exit(2)except ValueError:sys.stderr.write("Parameter for option %s must be an integer.\n"%argv[0])sys.exit(2)if configFilename==None:usage('configuration file required')elif argv:a = ""if len(argv) > 1:a = "s"usage('unrecognized argument%s: %s' % (a, ' '.join(argv)))# default to gpAdminLogs for a log file, may be overwrittenif self.options.l is None:self.options.l = os.path.join(os.environ.get('HOME', '.'),'gpAdminLogs')if not os.path.isdir(self.options.l):os.mkdir(self.options.l)self.options.l = os.path.join(self.options.l, 'gpload_' + \datetime.date.today().strftime('%Y%m%d') + '.log')try:self.logfile = open(self.options.l,'a')except Exception, e:self.log(self.ERROR, "could not open logfile %s: %s" % \(self.options.l, e))if seenv and seenq:self.log(self.ERROR, "-q conflicts with -v and -V")if self.options.D:self.log(self.INFO, 'gpload has the -D option, so it does not actually load any data')try:f = open(configFilename,'r')except IOError,e:self.log(self.ERROR, "could not open configuration file: %s" % e)# pull in the config file, which should be in valid YAMLtry:# do an initial parse, validating the config filedoc = f.read()self.config = yaml.load(doc)self.configOriginal = changeToUnicode(self.config)self.config = dictKeyToLower(self.config)ver = self.getconfig('version', unicode, extraStuff = ' tag')if ver != '1.0.0.1':self.control_file_error("gpload configuration schema version must be 1.0.0.1")# second parse, to check that the keywords are sensibley = yaml.compose(doc)# first should be MappingNodeif not isinstance(y, yaml.MappingNode):self.control_file_error("configuration file must begin with a mapping")yaml_walk(self, y.value, [])except yaml.scanner.ScannerError,e:self.log(self.ERROR, "configuration file error: %s, line %s" % \(e.problem, e.problem_mark.line))except yaml.reader.ReaderError, e:es = ""if isinstance(e.character, str):es = "'%s' codec can't decode byte #x%02x: %s position %d" % \(e.encoding, ord(e.character), e.reason,e.position)else:es = "unacceptable character #x%04x at byte %d: %s"    \% (ord(e.character), e.position, e.reason)self.log(self.ERROR, es)except yaml.error.MarkedYAMLError, e:self.log(self.ERROR, "configuration file error: %s, line %s" % \(e.problem, e.problem_mark.line))f.close()self.subprocesses = []self.log(self.INFO,'gpload session started ' + \datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

其中最重要的函数就是加载yaml配置文件,利用compose函数进行解析,然后使用yaml_walk函数对yaml文件中的配置进行正确性的校验。如果需要在该yaml配置文件中加入新的关键字,需要在下面的字典中加入新增关键字条目,并设置是否需要解析该关键字的孩子关键字,以及设置该关键字的父关键字。

gpload run成员函数

    def run2(self):self.log(self.DEBUG, 'config ' + str(self.config))start = time.time()self.read_config()self.setup_connection()self.read_table_metadata()self.read_columns()self.read_mapping()self.start_gpfdists()self.do_method()self.log(self.INFO, 'running time: %.2f seconds'%(time.time()-start))

read_config函数主要是读取表名,如果表名中带有schema,则将其赋值给self.schema,否则self.schema为None。以command line > config file > env variable优先级处理host、Port、User、database等参数。
setup_connection连接数据库

read_table_metadata

read_table_metadata读取表的元数据,Greenplum常用SQL——通过表名查找shema名,Greenplum常用SQL——通过表名查询列名、类型、是否具有序列

    def read_table_metadata(self):# KAS Note to self. If schema is specified, then probably should use PostgreSQL rules for defining it.# find the shema name for this table (according to search_path)# if it was not explicitly specified in the configuration file.if self.schema is None:queryString = """SELECT n.nspnameFROM pg_catalog.pg_class cLEFT JOIN pg_catalog.pg_namespace nON n.oid = c.relnamespaceWHERE c.relname = '%s'AND pg_catalog.pg_table_is_visible(c.oid);""" % quote_unident(self.table)resultList = self.db.query(queryString.encode('utf-8')).getresult()if len(resultList) > 0:self.schema = (resultList[0])[0]self.log(self.INFO, "setting schema '%s' for table '%s'" % (self.schema, quote_unident(self.table)))else:self.log(self.ERROR, "table %s not found in any database schema" % self.table)queryString = """select nt.nspname as table_schema,c.relname as table_name,a.attname as column_name,a.attnum as ordinal_position,format_type(a.atttypid, a.atttypmod) as data_type,c.relkind = 'r' AS is_updatable,a.atttypid in (23, 20) and a.atthasdef and(select position ( 'nextval(' in pg_catalog.pg_get_expr(adbin,adrelid) ) > 0 andposition ( '::regclass)' in pg_catalog.pg_get_expr(adbin,adrelid) ) > 0FROM pg_catalog.pg_attrdef dWHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef) as has_sequencefrom pg_catalog.pg_class c join pg_catalog.pg_namespace nt on (c.relnamespace = nt.oid)join pg_attribute a on (a.attrelid = c.oid)where c.relname = '%s' and nt.nspname = '%s'and a.attnum > 0 and a.attisdropped = 'f'order by a.attnum """ % (quote_unident(self.table), quote_unident(self.schema))count = 0self.into_columns = []self.into_columns_dict = dict()resultList = self.db.query(queryString.encode('utf-8')).dictresult()while count < len(resultList):row = resultList[count]count += 1ct = unicode(row['data_type'])if ct == 'bigserial':ct = 'bigint'elif ct == 'serial':ct = 'int4'name = unicode(row['column_name'], 'utf-8')name = quote_ident(name)if unicode(row['has_sequence']) != unicode('f'):has_seq = Trueelse:has_seq = Falsei = [name,ct,None, has_seq]self.into_columns.append(i)self.into_columns_dict[name] = iself.log(self.DEBUG, "found input column: " + str(i))if count == 0:# see if it's a permissions issue or it actually doesn't existtableName = quote_unident(self.table)tableSchema = quote_unident(self.schema)sql = """select 1 from pg_class c, pg_namespace nwhere c.relname = '%s' andn.nspname = '%s' andn.oid = c.relnamespace""" % (tableName, tableSchema)resultList = self.db.query(sql.encode('utf-8')).getresult()if len(resultList) > 0:self.log(self.ERROR, "permission denied for table %s.%s" % \(tableSchema, tableName))else:self.log(self.ERROR, 'table %s.%s does not exist in database %s'% (tableSchema, tableName, self.options.d))

经过该函数我们得到了self.into_columns列表存放的是[name,ct,None, has_seq](列名,列类型,None,是否具有序列),into_columns_dict是键为列名,值为[name,ct,None, has_seq]的字典。

read_columns

read_columns用于用户指定导入数据对应哪些列,构成元素为(列名,列类型,None,False)的from_columns。如果没有指定该功能,则from_columns等于into_columns。

    def read_columns(self):columns = self.getconfig('gpload:input:columns',list,None, returnOriginal=True)if columns != None:self.from_cols_from_user = True # user specified from columnsself.from_columns = []for d in columns:if type(d)!=dict:self.control_file_error("gpload:input:columns must be a sequence of YAML mappings")tempkey = d.keys()[0]value = d[tempkey]""" remove leading or trailing spaces """d = { tempkey.strip() : value }key = d.keys()[0]if d[key] is None:self.log(self.DEBUG,'getting source column data type from target')for name, typ, mapto, hasseq in self.into_columns:if sqlIdentifierCompare(name, key):d[key] = typbreak# perform the same kind of magic type change that postgres doesif d[key] == 'bigserial':d[key] = 'bigint'elif d[key] == 'serial':d[key] = 'int4'# Mark this column as having no mapping, which is important# for do_insert()self.from_columns.append([key.lower(),d[key].lower(),None, False])else:self.from_columns = self.into_columnsself.from_cols_from_user = False# make sure that all columns have a typefor name, typ, map, hasseq in self.from_columns:if typ is None:self.log(self.ERROR, 'column "%s" has no type ' % name +'and does not appear in target table "%s"' % self.schemaTable)self.log(self.DEBUG, 'from columns are:')for c in self.from_columns:name = c[0]typ = c[1]self.log(self.DEBUG, '%s: %s'%(name,typ))

read_mapping

如果配置了映射,则通过self.into_columns中的列名查看映射的字典,如果有则将其映射的value,添加到self.into_columns列表相应元素的第3位。如果没有配置映射,则将列名map anything yet to be mapped to itself,一一映射。

    def read_mapping(self):mapping = self.getconfig('gpload:output:mapping',dict,None, returnOriginal=True)if mapping:for key,value in mapping.iteritems():if type(key) != unicode or type(value) != unicode:self.control_file_error("gpload:output:mapping must be a YAML type mapping from strings to strings")found = Falsefor a in self.into_columns:if sqlIdentifierCompare(a[0], key) == True:a[2] = valuefound = Truebreakif found == False:self.log(self.ERROR,'%s in mapping is not in table %s'% \(key, self.schemaTable))else:# Now, map anything yet to be mapped to itself, picking up on those# columns which are not found in the table.for x in self.from_columns:# Check to see if it already has a mapping valuei = filter(lambda a:a[2] == x[0], self.into_columns)if not i:# Check to see if the target column names match the input column names.for a in self.into_columns:if sqlIdentifierCompare(a[0], x[0]) == True:i = abreakif i:if i[2] is None: i[2] = i[0]else:self.log(self.ERROR, 'no mapping for input column ' +'"%s" to output table' % x[0])for name,typ,mapto,seq in self.into_columns:self.log(self.DEBUG,'%s: %s = %s'%(name,typ,mapto))

start_gpfdists

后续解析

do_method

如果开启preload,则需要看看再insert模式下是否需要先truncate表,如果需要则truncate。从配置文件中获取reuse_tables、fast_match和staging_table。由于GP5不支持error_table,如果配置有选中该特性,则将reuse_tables和log_errors设置为true。
执行pre sql,针对不同模式执行相应函数
如果处于merge或update模式,需要truncate staging_table_name
执行after sql,如果no_auto_trans未打开且不是insert,则需要执行commit

    def do_method(self):# Is the table to be truncated before the load?preload = self.getconfig('gpload:preload', list, default=None)method = self.getconfig('gpload:output:mode', unicode, 'insert').lower()self.log_errors = self.getconfig('gpload:input:log_errors', bool, False)truncate = Falseself.reuse_tables = Falseif not self.options.no_auto_trans and not method=='insert':self.db.query("BEGIN")if preload:truncate = self.getconfig('gpload:preload:truncate',bool,False)self.reuse_tables = self.getconfig('gpload:preload:reuse_tables',bool,False)self.fast_match = self.getconfig('gpload:preload:fast_match',bool,False)if self.reuse_tables == False and self.fast_match == True:self.log(self.WARN, 'fast_match is ignored when reuse_tables is false!')self.staging_table = self.getconfig('gpload:preload:staging_table', unicode, default=None)if self.error_table:self.log_errors = Trueself.reuse_tables = Trueif truncate == True:if method=='insert':self.do_truncate(self.schemaTable)else:self.log(self.ERROR, 'preload truncate operation should be used with insert ' +'operation only. used with %s' % method)# sql pre or post processing?sql = self.getconfig('gpload:sql', list, default=None)before   = Noneafter    = Noneif sql:before   = self.getconfig('gpload:sql:before', unicode, default=None)after    = self.getconfig('gpload:sql:after', unicode, default=None)if before:self.log(self.LOG, "Pre-SQL from user: %s" % before)if not self.options.D:try:self.db.query(before.encode('utf-8'))except Exception, e:self.log(self.ERROR, 'could not execute SQL in sql:before "%s": %s' %(before, str(e)))if method=='insert':self.do_method_insert()elif method=='update':self.do_method_update()elif method=='merge':self.do_method_merge()else:self.control_file_error('unsupported method %s' % method)# truncate the staging table to avoid dumping it's content - see MPP-15474if method=='merge' or method=='update':self.do_truncate(self.staging_table_name)if after:self.log(self.LOG, "Post-SQL from user: %s" % after)if not self.options.D:try:self.db.query(after.encode('utf-8'))except Exception, e:self.log(self.ERROR, 'could not execute SQL in sql:after "%s": %s' %(after, str(e)))if not self.options.no_auto_trans and not method=='insert':self.db.query("COMMIT")

关注reuse_tables、fast_match、staging_table和log_errors选项。

Greenplum Python工具库gpload学习——gpload类相关推荐

  1. Greenplum Python专用库gppylib学习——base.py

    base.py依赖的python包(Queue,threading,os,signal,subprocess/subprocess32,sys,time,warnings,paramiko,getpa ...

  2. Greenplum Python专用库gppylib学习——GpArray

    gparray.py依赖的python包(datetime.copy.traceback.os),依赖的gp包(gplog.utils.db.gpversion.commands.unix) from ...

  3. 覆盖所有领域的 Python 工具库汇总!建议收藏!!!

    文章首发于个人站点 覆盖所有领域的 Python 工具库汇总 公众号:[DreamHub] 环境管理 管理 Python 版本和环境的工具 p – 非常简单的交互式 python 版本管理工具. py ...

  4. Python第三方库pygame学习笔记(一)

    Pygame Python最经典的2D游戏开发第三方库,也支持3D游戏开发 Pygame适合用于游戏逻辑验证.游戏入门及系统演示验证 Pygame是一种游戏开发引擎,基本逻辑具有参考价值 pygame ...

  5. 【效率倍增】5 个有助于自动化办公的 Python 工具库

    想想你在工作中所做的所有重复性任务.发送电子邮件.创建 Excel 报告.从 PDF 中提取数据.手动进行大量的数据分析工作. 我相信没有人愿意天天重复这样做,但最终,必须有人这样做.有没有更好的解决 ...

  6. Python培训教程分享:“高效实用” 的Python工具库

    作为一名合格Python技术员,对于Python工具库的使用是少不了的,本期Python培训教程就为大家分享的是""高效实用" 的Python工具库",希望能够 ...

  7. Python工具库安装

    1.Python工具库下载 (1)查询安装Python的版本信息. 按键 Win+R,在弹出的"运行"对话框中输入cmd,在弹出的Dos系统中,输入python,即可查询得到Pyt ...

  8. pycharm未识别python工具库的解决方法

    pycharm未识别python工具库的解决方法 本人遇到了已经安装了python工具库,但pycharm没有识别到的情况.后来发现,原来是我设置的运行环境没有选对.我的工具库安装在conda创建的新 ...

  9. Python可视化库——plotnine学习和基本使用(二):theme工具库的介绍

    其他参数和工具库 主题库和工具库基本语法 主题theme_ 工具库theme 主题库和工具库基本语法 主题theme_ 主要用来修改绘图的背景主题 基本语法 Value theme_bw 黑色网格线白 ...

  10. 整理了上千个热门的 Python 工具库,涵盖24个大方向!

    前言 大家好,Python 编程语言以语法简单.语言简洁.功能强大而闻名,根本原因是在于强大的社区生态. 今天我就给大家分享一下这些天梳理的近千个热门 Python 库,当我们需要某个方向的工具包时, ...

最新文章

  1. CentOS7的node.js安装
  2. Linux硬件信息查看
  3. WAIC2020开幕在即,第四范式亮点抢先看
  4. Vallog可以识别的错误及错误提示
  5. 给管道注册事件,用于用户是否登录!
  6. oracle用hints调优,oracle hints的那点事
  7. java自定义注解解析
  8. 小米商城html_北京市发放新一批 170 万个消费券:京东、小米商城等平台可领
  9. 性能测试——loadrunner_添加多个主机发送请求
  10. linux软件抗干扰,解决asterisk下使用misdn时被SELinux干扰导致权限不足的问题
  11. DNSBIND——DNS的ACL和视图
  12. HTML+JS调用摄像头拍照并上传图片
  13. 服装收银系统 服装收银 服装收银软件 收银软件 收银系统 好用的服装软件
  14. 宅急送BOS系统软硬件集成方案goldengate(一)
  15. iOS苹果超级签苹果分发平台企鹅:422903005
  16. RTL8367学习笔记1——基础知识
  17. 集成学习-Task2 机器学习基础模型回顾
  18. c语言编程练习题及答案
  19. K-means 算法实现二维数据聚类
  20. 运维工程师的发展和前景

热门文章

  1. MySql 分数排名
  2. 句子成分分析(C++)
  3. TortoiseSVN安装使用教程(超详细)
  4. dw怎么保存html格式,教你如何用Dreamweaver制作网页以及保存网页的方法--系统之家...
  5. 为什么要通过w3c验证.
  6. 「 神器 」极简网速监控悬浮窗软件
  7. 时间搓转换剩余时间 php
  8. 【Go实战基础】程序里面数据是如何显示到浏览器当中的
  9. 【MySQL基础】03:约束与运算符
  10. VUE游戏设计:实现外星人的攻击冲击波