Greenplum Python工具库gpload学习——gpload类
gpload.py代码写得挺简洁的,主要逻辑集中中gpload类中,其中run函数是最为重要的,而run函数主要逻辑就是调用run2函数。
if __name__ == '__main__':g = gpload(sys.argv[1:])g.run()sys.stdout.flush()sys.stderr.flush()os._exit(g.exitValue)
gpload构造函数
def __init__(self,argv):self.threads = [] # remember threads so that we can join() against themself.exitValue = 0self.options = options()self.options.h = Noneself.options.gpfdist_timeout = Noneself.options.p = Noneself.options.U = Noneself.options.W = Falseself.options.D = Falseself.options.no_auto_trans = Falseself.options.password = Noneself.options.d = Noneself.DEBUG = 5self.LOG = 4self.INFO = 3self.WARN = 2self.ERROR = 1self.options.qv = self.INFOself.options.l = Noneself.formatOpts = ""self.startTimestamp = time.time()self.error_table = Falseself.gpdb_version = ""seenv = Falseseenq = False# Create Temp and External table names. However external table name could# get overwritten with another name later on (see create_external_table_name).# MPP-20927: gpload external table name problem. We use uuid to avoid# external table name confliction.self.unique_suffix = str(uuid.uuid1()).replace('-', '_')self.staging_table_name = 'temp_staging_gpload_' + self.unique_suffixself.extTableName = 'ext_gpload_' + self.unique_suffix# SQL to run in order to undo our temporary workself.cleanupSql = []self.distkey = NoneconfigFilename = Nonewhile argv:try:try:if argv[0]=='-h':self.options.h = argv[1]argv = argv[2:]if argv[0]=='--gpfdist_timeout':self.options.gpfdist_timeout = argv[1]argv = argv[2:]elif argv[0]=='-p':self.options.p = int(argv[1])argv = argv[2:]elif argv[0]=='-l':self.options.l = argv[1]argv = argv[2:]elif argv[0]=='-q':self.options.qv -= 1argv = argv[1:]seenq = Trueelif argv[0]=='--version':sys.stderr.write("gpload version $Revision$\n")sys.exit(0)elif argv[0]=='-v':self.options.qv = self.LOGargv = argv[1:]seenv = Trueelif argv[0]=='-V':self.options.qv = self.DEBUGargv = argv[1:]seenv = Trueelif argv[0]=='-W':self.options.W = Trueargv = argv[1:]elif argv[0]=='-D':self.options.D = Trueargv = argv[1:]elif argv[0]=='-U':self.options.U = argv[1]argv = argv[2:]elif argv[0]=='-d':self.options.d = argv[1]argv = argv[2:]elif argv[0]=='-f':configFilename = argv[1]argv = argv[2:]elif argv[0]=='--no_auto_trans':self.options.no_auto_trans = Trueargv = argv[1:]elif argv[0]=='-?':usage()else:breakexcept IndexError:sys.stderr.write("Option %s needs a parameter.\n"%argv[0])sys.exit(2)except ValueError:sys.stderr.write("Parameter for option %s must be an integer.\n"%argv[0])sys.exit(2)if configFilename==None:usage('configuration file required')elif argv:a = ""if len(argv) > 1:a = "s"usage('unrecognized argument%s: %s' % (a, ' '.join(argv)))# default to gpAdminLogs for a log file, may be overwrittenif self.options.l is None:self.options.l = os.path.join(os.environ.get('HOME', '.'),'gpAdminLogs')if not os.path.isdir(self.options.l):os.mkdir(self.options.l)self.options.l = os.path.join(self.options.l, 'gpload_' + \datetime.date.today().strftime('%Y%m%d') + '.log')try:self.logfile = open(self.options.l,'a')except Exception, e:self.log(self.ERROR, "could not open logfile %s: %s" % \(self.options.l, e))if seenv and seenq:self.log(self.ERROR, "-q conflicts with -v and -V")if self.options.D:self.log(self.INFO, 'gpload has the -D option, so it does not actually load any data')try:f = open(configFilename,'r')except IOError,e:self.log(self.ERROR, "could not open configuration file: %s" % e)# pull in the config file, which should be in valid YAMLtry:# do an initial parse, validating the config filedoc = f.read()self.config = yaml.load(doc)self.configOriginal = changeToUnicode(self.config)self.config = dictKeyToLower(self.config)ver = self.getconfig('version', unicode, extraStuff = ' tag')if ver != '1.0.0.1':self.control_file_error("gpload configuration schema version must be 1.0.0.1")# second parse, to check that the keywords are sensibley = yaml.compose(doc)# first should be MappingNodeif not isinstance(y, yaml.MappingNode):self.control_file_error("configuration file must begin with a mapping")yaml_walk(self, y.value, [])except yaml.scanner.ScannerError,e:self.log(self.ERROR, "configuration file error: %s, line %s" % \(e.problem, e.problem_mark.line))except yaml.reader.ReaderError, e:es = ""if isinstance(e.character, str):es = "'%s' codec can't decode byte #x%02x: %s position %d" % \(e.encoding, ord(e.character), e.reason,e.position)else:es = "unacceptable character #x%04x at byte %d: %s" \% (ord(e.character), e.position, e.reason)self.log(self.ERROR, es)except yaml.error.MarkedYAMLError, e:self.log(self.ERROR, "configuration file error: %s, line %s" % \(e.problem, e.problem_mark.line))f.close()self.subprocesses = []self.log(self.INFO,'gpload session started ' + \datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
其中最重要的函数就是加载yaml配置文件,利用compose函数进行解析,然后使用yaml_walk函数对yaml文件中的配置进行正确性的校验。如果需要在该yaml配置文件中加入新的关键字,需要在下面的字典中加入新增关键字条目,并设置是否需要解析该关键字的孩子关键字,以及设置该关键字的父关键字。
gpload run成员函数
def run2(self):self.log(self.DEBUG, 'config ' + str(self.config))start = time.time()self.read_config()self.setup_connection()self.read_table_metadata()self.read_columns()self.read_mapping()self.start_gpfdists()self.do_method()self.log(self.INFO, 'running time: %.2f seconds'%(time.time()-start))
read_config函数主要是读取表名,如果表名中带有schema,则将其赋值给self.schema,否则self.schema为None。以command line > config file > env variable优先级处理host、Port、User、database等参数。
setup_connection连接数据库
read_table_metadata
read_table_metadata读取表的元数据,Greenplum常用SQL——通过表名查找shema名,Greenplum常用SQL——通过表名查询列名、类型、是否具有序列
def read_table_metadata(self):# KAS Note to self. If schema is specified, then probably should use PostgreSQL rules for defining it.# find the shema name for this table (according to search_path)# if it was not explicitly specified in the configuration file.if self.schema is None:queryString = """SELECT n.nspnameFROM pg_catalog.pg_class cLEFT JOIN pg_catalog.pg_namespace nON n.oid = c.relnamespaceWHERE c.relname = '%s'AND pg_catalog.pg_table_is_visible(c.oid);""" % quote_unident(self.table)resultList = self.db.query(queryString.encode('utf-8')).getresult()if len(resultList) > 0:self.schema = (resultList[0])[0]self.log(self.INFO, "setting schema '%s' for table '%s'" % (self.schema, quote_unident(self.table)))else:self.log(self.ERROR, "table %s not found in any database schema" % self.table)queryString = """select nt.nspname as table_schema,c.relname as table_name,a.attname as column_name,a.attnum as ordinal_position,format_type(a.atttypid, a.atttypmod) as data_type,c.relkind = 'r' AS is_updatable,a.atttypid in (23, 20) and a.atthasdef and(select position ( 'nextval(' in pg_catalog.pg_get_expr(adbin,adrelid) ) > 0 andposition ( '::regclass)' in pg_catalog.pg_get_expr(adbin,adrelid) ) > 0FROM pg_catalog.pg_attrdef dWHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef) as has_sequencefrom pg_catalog.pg_class c join pg_catalog.pg_namespace nt on (c.relnamespace = nt.oid)join pg_attribute a on (a.attrelid = c.oid)where c.relname = '%s' and nt.nspname = '%s'and a.attnum > 0 and a.attisdropped = 'f'order by a.attnum """ % (quote_unident(self.table), quote_unident(self.schema))count = 0self.into_columns = []self.into_columns_dict = dict()resultList = self.db.query(queryString.encode('utf-8')).dictresult()while count < len(resultList):row = resultList[count]count += 1ct = unicode(row['data_type'])if ct == 'bigserial':ct = 'bigint'elif ct == 'serial':ct = 'int4'name = unicode(row['column_name'], 'utf-8')name = quote_ident(name)if unicode(row['has_sequence']) != unicode('f'):has_seq = Trueelse:has_seq = Falsei = [name,ct,None, has_seq]self.into_columns.append(i)self.into_columns_dict[name] = iself.log(self.DEBUG, "found input column: " + str(i))if count == 0:# see if it's a permissions issue or it actually doesn't existtableName = quote_unident(self.table)tableSchema = quote_unident(self.schema)sql = """select 1 from pg_class c, pg_namespace nwhere c.relname = '%s' andn.nspname = '%s' andn.oid = c.relnamespace""" % (tableName, tableSchema)resultList = self.db.query(sql.encode('utf-8')).getresult()if len(resultList) > 0:self.log(self.ERROR, "permission denied for table %s.%s" % \(tableSchema, tableName))else:self.log(self.ERROR, 'table %s.%s does not exist in database %s'% (tableSchema, tableName, self.options.d))
经过该函数我们得到了self.into_columns列表存放的是[name,ct,None, has_seq](列名,列类型,None,是否具有序列),into_columns_dict是键为列名,值为[name,ct,None, has_seq]的字典。
read_columns
read_columns用于用户指定导入数据对应哪些列,构成元素为(列名,列类型,None,False)的from_columns。如果没有指定该功能,则from_columns等于into_columns。
def read_columns(self):columns = self.getconfig('gpload:input:columns',list,None, returnOriginal=True)if columns != None:self.from_cols_from_user = True # user specified from columnsself.from_columns = []for d in columns:if type(d)!=dict:self.control_file_error("gpload:input:columns must be a sequence of YAML mappings")tempkey = d.keys()[0]value = d[tempkey]""" remove leading or trailing spaces """d = { tempkey.strip() : value }key = d.keys()[0]if d[key] is None:self.log(self.DEBUG,'getting source column data type from target')for name, typ, mapto, hasseq in self.into_columns:if sqlIdentifierCompare(name, key):d[key] = typbreak# perform the same kind of magic type change that postgres doesif d[key] == 'bigserial':d[key] = 'bigint'elif d[key] == 'serial':d[key] = 'int4'# Mark this column as having no mapping, which is important# for do_insert()self.from_columns.append([key.lower(),d[key].lower(),None, False])else:self.from_columns = self.into_columnsself.from_cols_from_user = False# make sure that all columns have a typefor name, typ, map, hasseq in self.from_columns:if typ is None:self.log(self.ERROR, 'column "%s" has no type ' % name +'and does not appear in target table "%s"' % self.schemaTable)self.log(self.DEBUG, 'from columns are:')for c in self.from_columns:name = c[0]typ = c[1]self.log(self.DEBUG, '%s: %s'%(name,typ))
read_mapping
如果配置了映射,则通过self.into_columns中的列名查看映射的字典,如果有则将其映射的value,添加到self.into_columns列表相应元素的第3位。如果没有配置映射,则将列名map anything yet to be mapped to itself,一一映射。
def read_mapping(self):mapping = self.getconfig('gpload:output:mapping',dict,None, returnOriginal=True)if mapping:for key,value in mapping.iteritems():if type(key) != unicode or type(value) != unicode:self.control_file_error("gpload:output:mapping must be a YAML type mapping from strings to strings")found = Falsefor a in self.into_columns:if sqlIdentifierCompare(a[0], key) == True:a[2] = valuefound = Truebreakif found == False:self.log(self.ERROR,'%s in mapping is not in table %s'% \(key, self.schemaTable))else:# Now, map anything yet to be mapped to itself, picking up on those# columns which are not found in the table.for x in self.from_columns:# Check to see if it already has a mapping valuei = filter(lambda a:a[2] == x[0], self.into_columns)if not i:# Check to see if the target column names match the input column names.for a in self.into_columns:if sqlIdentifierCompare(a[0], x[0]) == True:i = abreakif i:if i[2] is None: i[2] = i[0]else:self.log(self.ERROR, 'no mapping for input column ' +'"%s" to output table' % x[0])for name,typ,mapto,seq in self.into_columns:self.log(self.DEBUG,'%s: %s = %s'%(name,typ,mapto))
start_gpfdists
后续解析
do_method
如果开启preload,则需要看看再insert模式下是否需要先truncate表,如果需要则truncate。从配置文件中获取reuse_tables、fast_match和staging_table。由于GP5不支持error_table,如果配置有选中该特性,则将reuse_tables和log_errors设置为true。
执行pre sql,针对不同模式执行相应函数
如果处于merge或update模式,需要truncate staging_table_name
执行after sql,如果no_auto_trans未打开且不是insert,则需要执行commit
def do_method(self):# Is the table to be truncated before the load?preload = self.getconfig('gpload:preload', list, default=None)method = self.getconfig('gpload:output:mode', unicode, 'insert').lower()self.log_errors = self.getconfig('gpload:input:log_errors', bool, False)truncate = Falseself.reuse_tables = Falseif not self.options.no_auto_trans and not method=='insert':self.db.query("BEGIN")if preload:truncate = self.getconfig('gpload:preload:truncate',bool,False)self.reuse_tables = self.getconfig('gpload:preload:reuse_tables',bool,False)self.fast_match = self.getconfig('gpload:preload:fast_match',bool,False)if self.reuse_tables == False and self.fast_match == True:self.log(self.WARN, 'fast_match is ignored when reuse_tables is false!')self.staging_table = self.getconfig('gpload:preload:staging_table', unicode, default=None)if self.error_table:self.log_errors = Trueself.reuse_tables = Trueif truncate == True:if method=='insert':self.do_truncate(self.schemaTable)else:self.log(self.ERROR, 'preload truncate operation should be used with insert ' +'operation only. used with %s' % method)# sql pre or post processing?sql = self.getconfig('gpload:sql', list, default=None)before = Noneafter = Noneif sql:before = self.getconfig('gpload:sql:before', unicode, default=None)after = self.getconfig('gpload:sql:after', unicode, default=None)if before:self.log(self.LOG, "Pre-SQL from user: %s" % before)if not self.options.D:try:self.db.query(before.encode('utf-8'))except Exception, e:self.log(self.ERROR, 'could not execute SQL in sql:before "%s": %s' %(before, str(e)))if method=='insert':self.do_method_insert()elif method=='update':self.do_method_update()elif method=='merge':self.do_method_merge()else:self.control_file_error('unsupported method %s' % method)# truncate the staging table to avoid dumping it's content - see MPP-15474if method=='merge' or method=='update':self.do_truncate(self.staging_table_name)if after:self.log(self.LOG, "Post-SQL from user: %s" % after)if not self.options.D:try:self.db.query(after.encode('utf-8'))except Exception, e:self.log(self.ERROR, 'could not execute SQL in sql:after "%s": %s' %(after, str(e)))if not self.options.no_auto_trans and not method=='insert':self.db.query("COMMIT")
关注reuse_tables、fast_match、staging_table和log_errors选项。
Greenplum Python工具库gpload学习——gpload类相关推荐
- Greenplum Python专用库gppylib学习——base.py
base.py依赖的python包(Queue,threading,os,signal,subprocess/subprocess32,sys,time,warnings,paramiko,getpa ...
- Greenplum Python专用库gppylib学习——GpArray
gparray.py依赖的python包(datetime.copy.traceback.os),依赖的gp包(gplog.utils.db.gpversion.commands.unix) from ...
- 覆盖所有领域的 Python 工具库汇总!建议收藏!!!
文章首发于个人站点 覆盖所有领域的 Python 工具库汇总 公众号:[DreamHub] 环境管理 管理 Python 版本和环境的工具 p – 非常简单的交互式 python 版本管理工具. py ...
- Python第三方库pygame学习笔记(一)
Pygame Python最经典的2D游戏开发第三方库,也支持3D游戏开发 Pygame适合用于游戏逻辑验证.游戏入门及系统演示验证 Pygame是一种游戏开发引擎,基本逻辑具有参考价值 pygame ...
- 【效率倍增】5 个有助于自动化办公的 Python 工具库
想想你在工作中所做的所有重复性任务.发送电子邮件.创建 Excel 报告.从 PDF 中提取数据.手动进行大量的数据分析工作. 我相信没有人愿意天天重复这样做,但最终,必须有人这样做.有没有更好的解决 ...
- Python培训教程分享:“高效实用” 的Python工具库
作为一名合格Python技术员,对于Python工具库的使用是少不了的,本期Python培训教程就为大家分享的是""高效实用" 的Python工具库",希望能够 ...
- Python工具库安装
1.Python工具库下载 (1)查询安装Python的版本信息. 按键 Win+R,在弹出的"运行"对话框中输入cmd,在弹出的Dos系统中,输入python,即可查询得到Pyt ...
- pycharm未识别python工具库的解决方法
pycharm未识别python工具库的解决方法 本人遇到了已经安装了python工具库,但pycharm没有识别到的情况.后来发现,原来是我设置的运行环境没有选对.我的工具库安装在conda创建的新 ...
- Python可视化库——plotnine学习和基本使用(二):theme工具库的介绍
其他参数和工具库 主题库和工具库基本语法 主题theme_ 工具库theme 主题库和工具库基本语法 主题theme_ 主要用来修改绘图的背景主题 基本语法 Value theme_bw 黑色网格线白 ...
- 整理了上千个热门的 Python 工具库,涵盖24个大方向!
前言 大家好,Python 编程语言以语法简单.语言简洁.功能强大而闻名,根本原因是在于强大的社区生态. 今天我就给大家分享一下这些天梳理的近千个热门 Python 库,当我们需要某个方向的工具包时, ...
最新文章
- CentOS7的node.js安装
- Linux硬件信息查看
- WAIC2020开幕在即,第四范式亮点抢先看
- Vallog可以识别的错误及错误提示
- 给管道注册事件,用于用户是否登录!
- oracle用hints调优,oracle hints的那点事
- java自定义注解解析
- 小米商城html_北京市发放新一批 170 万个消费券:京东、小米商城等平台可领
- 性能测试——loadrunner_添加多个主机发送请求
- linux软件抗干扰,解决asterisk下使用misdn时被SELinux干扰导致权限不足的问题
- DNSBIND——DNS的ACL和视图
- HTML+JS调用摄像头拍照并上传图片
- 服装收银系统 服装收银 服装收银软件 收银软件 收银系统 好用的服装软件
- 宅急送BOS系统软硬件集成方案goldengate(一)
- iOS苹果超级签苹果分发平台企鹅:422903005
- RTL8367学习笔记1——基础知识
- 集成学习-Task2 机器学习基础模型回顾
- c语言编程练习题及答案
- K-means 算法实现二维数据聚类
- 运维工程师的发展和前景