base.py依赖的python包(Queue,threading,os,signal,subprocess/subprocess32,sys,time,warnings,paramiko,getpass),依赖的gp包(gplog,gpsubprocess,pygresql)。pygresql导入语句的是from pygresql.pg import DB,主要使用的DB是SQLCommand类,这个类先不用关注。gpsubprocess是对subprocess的封装,可以看到这里使用了两个子进程包gpsubprocess和subprocess。

代码分析

WorkerPool类

先看WorkPool类定义,类实例包含了存放worker实例的列表、存放带执行Command的work_queue队列、存放执行完Command的completed_queue队列。WorkerPool中的worker实例的数量是在构造函数就给定的,初始化后worker实例会一直运行,图中的start就是在构造函数中完成的。Worker实例从work_queue队列中取工作项Command的函数是getNetWorkItem。

def getNextWorkItem(self):return self.work_queue.get(block=True)

Worker实例处理完成work_queue中所有命令之后,取不到命令或取到的命令是halt_command,或者任务池标志了should_stop之后,使用markTaskDone函数告知WorkPool该任务完成。

def markTaskDone(self):self.work_queue.task_done()

clsSystemState.py的GpSystemStateProgram类中run函数中有base文件中类的使用示例,简化如下,通过这些示例来学习任务池的使用:

dispatchCount = 0
pool = base.WorkerPool(parallelDegree) #parallelDegree给定worker个数
for hostName, segments in ...:cmd = ...hostNameToCmd[hostName] = cmdpool.addCommand(cmd)dispatchCount+=1
pool.wait_and_printdots(dispatchCount)
hostNameToResults = {}
for hostName, cmd in hostNameToCmd.iteritems():hostNameToResults[hostName] = cmd.decodeResults() #取出结果集
pool.haltWork()

主要流程:通过addCommand函数向队列(work_queue)中添加工作负载(WorkerPool类可以通过构造函数向队列中添加多个命令(列表形式),或者通过addCommand函数添加单个命令)。针对work_queue队列由三种join的方法,代码如下。hostNameCmd是一个字典,键为hostName,值为cmd。通过cmd.decodeResults函数取出结果集。

def join(self):self.work_queue.join()return True
def _join_work_queue_with_timeout(self, timeout):"""Queue.join() unfortunately doesn't take a timeout (seehttps://bugs.python.org/issue9634). Fake it here, with a solutioninspired by notes on that bug report.XXX This solution uses undocumented Queue internals (though they are notunderscore-prefixed...)."""done_condition = self.work_queue.all_tasks_donedone_condition.acquire()try:while self.work_queue.unfinished_tasks:if (timeout <= 0):# Timed out.returnstart_time = time.time()done_condition.wait(timeout)timeout -= (time.time() - start_time)finally:done_condition.release()
def wait_and_printdots(self, command_count, quiet=True):while self.completed_queue.qsize() < command_count:time.sleep(1)if not quiet:sys.stdout.write(".")sys.stdout.flush()if not quiet:print " "self.join()


Queue/queue模块的类

属性 描述
Queue(maxsize=0) 创建一个先入先出队列。如果给定最大值,则在队列没有空间时阻塞;否则,为无限队列
LifoQueue(maxsize=0) 创建一个后入先出队列。如果给定最大值,则在队列没有空间时阻塞;否则,为无限队列
PriorityQueue(maxsize=0) 创建一个优先级队列。如果给定最大值,则在队列没有空间时阻塞,否则,为无限队列
Queue/queue异常
属性 描述
Empty
当对孔队列调用get*()方法时抛出异常
Full 当对已满的队列调用put*()方法时抛出异常

Worker类

worker类继承自threading模块中的Thread类,run函数先使用getNextWorkItem函数取得command,总共有四种情况:任务池中没有command,该Worker示例需要向任务池标记任务完成;如果取得的命令是pool.halt_command,该Worker示例需要向任务池标记任务完成;如果任务池标记了should_stop,该Worker示例需要向任务池标记任务完成;下面是正常流程,执行命令,并将命令放入任务池完成队列。

class Woker(Thread):...def run(self):while True:try:try:self.cmd = self.pool.getNextWorkItem()except TypeError:# misleading exception raised during interpreter shutdownreturn# we must have got a command to run hereif self.cmd is None:self.logger.debug("[%s] got a None cmd" % self.name)self.pool.markTaskDone()elif self.cmd is self.pool.halt_command:self.logger.debug("[%s] got a halt cmd" % self.name)self.pool.markTaskDone()self.cmd = Nonereturnelif self.pool.should_stop:self.logger.debug("[%s] got cmd and pool is stopped: %s" % (self.name, self.cmd))self.pool.markTaskDone()self.cmd = Noneelse:self.logger.debug("[%s] got cmd: %s" % (self.name, self.cmd.cmdStr))self.cmd.run()self.logger.debug("[%s] finished cmd: %s" % (self.name, self.cmd))self.pool.addFinishedWorkItem(self.cmd)self.cmd = Noneexcept Exception, e:self.logger.exception(e)if self.cmd:self.logger.debug("[%s] finished cmd with exception: %s" % (self.name, self.cmd))self.pool.addFinishedWorkItem(self.cmd)self.cmd = Nonedef haltWork(self):self.logger.debug("[%s] haltWork" % self.name)  c = self.cmdif c is not None and isinstance(c, Command):c.interrupt()c.cancel()

threading模块的Thread类实例化表示一个执行线程的对象,拥有的数据属性name(线程名)、ident(线程的标识符)、daemon(布尔标志,表示这个线程是否是守护线程),方法如下:
Thread对象方法描述
_ini_(group=None, target=None, name=None, args=(), kwargs={}, verbose=None, daemon=None) 实例化一个线程对象,需要有一个可调用的target,以及其参数args或kwargs。还可以传递name或group参数,不过后者还未实现。此外,verbose标志也是可以接受的。而daemon的值将会设定thread.daemon属性/标志

成员 描述
start() 开始执行该线程
run() 定义线程功能的方法(通常在子类中被应用开发者重写)
join(timeout=None) 直至启动的线程终止之前一直挂起;除非给出timeout(秒),否则会一直阻塞
getName() 返回线程名
setName(name) 设定线程名
isAlivel/is_alive() 布尔标志,表示这个线程是否还存活
isDaemon() 如果是守护线程,则返回True;否则,返回False
setDaemon(daemonic) 把线程的守护标志设定为布尔值daemonic(必须在线程start()之前调用)

使用Thread类可以有很多方法创建线程,这里介绍三种方法:1. 创建Thread的实例,传给它一个函数;2. 创建Thread的实例,传给它一个可调用的类实例;3.派生Thread的子类,并创建子类的实例。

Command类

Command类有两个执行函数runNoWait和run函数,runNoWait函数通过调用exec_context.execute(self,wait=False)函数执行命令,并返回proc;run函数直接调用exec_context.execute(self)函数。

    def runNoWait(self):faultPoint = os.getenv('GP_COMMAND_FAULT_POINT')if not faultPoint or (self.name and not self.name.startswith(faultPoint)):self.exec_context.execute(self, wait=False)return self.exec_context.procdef run(self, validateAfter=False):faultPoint = os.getenv('GP_COMMAND_FAULT_POINT')if not faultPoint or (self.name and not self.name.startswith(faultPoint)):self.exec_context.execute(self)else:# simulate errorself.results = CommandResult(1, 'Fault Injection', 'Fault Injection', False, True)if validateAfter:self.validate()pass

ExecutionContext类

以RemoteExecutionContext执行上下文类为例,参数是以以下方法处理的
keys = sorted(cmd.propagate_env_map.keys(), reverse=True) for k in keys: cmd.cmdStr = "%s=%s && %s" % (k, cmd.propagate_env_map[k], cmd.cmdStr),将参数序列化到cmdStr中。对于LocalExecutionContext来说,调用如下命令执行命令:self.proc = gpsubprocess.Popen(cmd.cmdStr, env=None, shell=True,executable='/bin/bash',stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE, close_fds=True),对GP封装的subprocess模式请参见相应其他系列的博客。如果需要等待子进程,则调用(rc, stdout_value, stderr_value) = self.proc.communicate2(input=self.stdin),然后使用cmd.set_results(CommandResult( rc, "".join(stdout_value), "".join(stderr_value), self.completed, self.halt)) def cancel(self, cmd):封装命令返回的结果。

class LocalExecutionContext(ExecutionContext):proc = Nonehalt = Falsecompleted = Falsedef __init__(self, stdin):ExecutionContext.__init__(self)self.stdin = stdinpassdef execute(self, cmd, wait=True):# prepend env. variables from ExcecutionContext.propagate_env_map# e.g. Given {'FOO': 1, 'BAR': 2}, we'll produce "FOO=1 BAR=2 ..."# also propagate env from command instance specific mapkeys = sorted(cmd.propagate_env_map.keys(), reverse=True)for k in keys:cmd.cmdStr = "%s=%s && %s" % (k, cmd.propagate_env_map[k], cmd.cmdStr)# executable='/bin/bash' is to ensure the shell is bash.  bash isn't the# actual command executed, but the shell that command string runs under.self.proc = gpsubprocess.Popen(cmd.cmdStr, env=None, shell=True,executable='/bin/bash',stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE, close_fds=True)cmd.pid = self.proc.pidif wait:(rc, stdout_value, stderr_value) = self.proc.communicate2(input=self.stdin)self.completed = Truecmd.set_results(CommandResult(rc, "".join(stdout_value), "".join(stderr_value), self.completed, self.halt))def cancel(self, cmd):if self.proc:try:os.kill(self.proc.pid, signal.SIGTERM)except OSError:passdef interrupt(self, cmd):self.halt = Trueif self.proc:self.proc.cancel()class RemoteExecutionContext(LocalExecutionContext):trail = set()def __init__(self, targetHost, stdin, gphome=None):LocalExecutionContext.__init__(self, stdin)self.targetHost = targetHostif gphome:self.gphome = gphomeelse:self.gphome = GPHOMEdef execute(self, cmd):# prepend env. variables from ExcecutionContext.propagate_env_map# e.g. Given {'FOO': 1, 'BAR': 2}, we'll produce "FOO=1 BAR=2 ..."self.__class__.trail.add(self.targetHost)# also propagate env from command instance specific mapkeys = sorted(cmd.propagate_env_map.keys(), reverse=True)for k in keys:cmd.cmdStr = "%s=%s && %s" % (k, cmd.propagate_env_map[k], cmd.cmdStr)# Escape " for remote execution otherwise it interferes with sshcmd.cmdStr = cmd.cmdStr.replace('"', '\\"')cmd.cmdStr = "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=60 " \"{targethost} \"{gphome} {cmdstr}\"".format(targethost=self.targetHost,gphome=". %s/greenplum_path.sh;" % self.gphome,cmdstr=cmd.cmdStr)LocalExecutionContext.execute(self, cmd)if (cmd.get_results().stderr.startswith('ssh_exchange_identification: Connection closed by remote host')):self.__retry(cmd)passdef __retry(self, cmd, count=0):if count == SSH_MAX_RETRY:returntime.sleep(SSH_RETRY_DELAY)LocalExecutionContext.execute(self, cmd)if (cmd.get_results().stderr.startswith('ssh_exchange_identification: Connection closed by remote host')):self.__retry(cmd, count + 1)

而RemoteExecutionContext先对参数进行格式化后,还需要对双引号使用双斜线进行替换后,然后添加ssh相关命令选项,代码如下所示:cmd.cmdStr = cmd.cmdStr.replace('"', '\\"') cmd.cmdStr = "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=60 {targethost} \"{gphome} {cmdstr}\"".format(targethost=self.targetHost, gphome=". %s/greenplum_path.sh;" % self.gphome, cmdstr=cmd.cmdStr),然后调用LocalExecutionContext父类的execute函数。如果返回的结果包含ssh_exchange_identification: Connection closed by remote host,则需要进行等待相应的时间,然后进行重试。

Controller-Worker架构模式


辅助说明:
Controller-Worker是一种组合架构模式,Controller基于Client的参数动态生成Woker数量,并控制Woker的生命周期,如创建和终止。
Controller属性:
Controller事先知道自身拥有的Woker类型。
Controller依赖一个工作任务池,通过工作任务池Controller监控整体任务执行情况。
Worker属性:
Worker并行消费工作任务池中任务,并把执行结果返回到任务池中。
Worker彼此间没有任何耦合。

辅组说明:
Controller通过WorkerPool和Worker进行命令传递。
Controller通过超时机制,保证最后一定有命令结果返回给Client
Controller通过halt命令,停止所有的Woker
Worker采用Thread方式来实现。
Worker1、Worker2、WorkerN无差别,根据获取的Cmd,通过ssh方式在对应的Host执行命令。

Greenplum Python专用库gppylib学习——base.py相关推荐

  1. Greenplum Python专用库gppylib学习——GpArray

    gparray.py依赖的python包(datetime.copy.traceback.os),依赖的gp包(gplog.utils.db.gpversion.commands.unix) from ...

  2. Python第三方库pygame学习笔记(一)

    Pygame Python最经典的2D游戏开发第三方库,也支持3D游戏开发 Pygame适合用于游戏逻辑验证.游戏入门及系统演示验证 Pygame是一种游戏开发引擎,基本逻辑具有参考价值 pygame ...

  3. python的库怎么学习_怎样学习一个Python 库 ?

    什么是Python 库? python 自称是带电池的语言,在于其拥有大量的库,每个库都是某一行业比较顶尖的人才开发出来完成某一任务的代码集合.库提供一套解决方案,要用Python几乎离不开对几个库的 ...

  4. 《利用python进行数据分析》第二版 第13章-Python建模库介 学习笔记

    文章目录 一.pandas与建模代码结合 二.用patsy创建模型描述 Patsy公式中的数据转换 分类数据与Pastsy 三.statsmodels介绍 评估线性模型 评估时间序列处理 四.scik ...

  5. Python图像处理库PIL -- 学习资源

    Resources 官方指南PIL Handbook(建议看英文,翻译) pil下载 安装中文指南 from the PythonWare PIL home page 推荐先看Python Imagi ...

  6. 学python要考什么证-这十个Python常用库,学习Python的你必须要知道!

    ,包括原生库和第三方库.不过,有这么多Python库,有些库得不到应有的关注也就不足为奇了. 注意:很多人学Python过程中会遇到各种烦恼问题,没有人帮答疑.为此小编建了个Python全栈免费答疑交 ...

  7. python 没找到库_这十个Python常用库,学习Python的你必须要知道!

    包括原生库和第三方库.不过,有这么多Python库,有些库得不到应有的关注也就不足为奇了. 注意:很多人学Python过程中会遇到各种烦恼问题,没有人帮答疑.为此小编建了个Python全栈免费答疑交流 ...

  8. python常用的库有哪些餐厅_这十个Python常用库,学习Python的你必须要知道!

    想知道Python取得如此巨大成功的原因吗?只要看看Python提供的大量库就知道了 包括原生库和第三方库.不过,有这么多Python库,有些库得不到应有的关注也就不足为奇了.此外,只在一个领域里的工 ...

  9. Python机器学习库CatBoost学习使用

    最近,接触到一个比较新颖的Boost方法的机器学习库,觉得很有意思的,号称通用性很强,所以拿来上手试试,这里只是初步的学习使用,相关的参考链接放在下面. CatBoost是俄罗斯的搜索巨头Yandex ...

最新文章

  1. zookeeper 客户端_zookeeper进阶-客户端源码详解
  2. 【解决方案】QT读写文件
  3. 用狄拉克函数来构造非光滑函数的光滑近似
  4. wait/waitpid函数与僵尸进程、fork 2 times
  5. Codeforces第一次rated比赛
  6. Android插件化开发基础之静态代理模式
  7. 「JXOI2018」游戏
  8. 实信号变成解析信号的实现方法
  9. 计算机常用术语缩写及英文
  10. centos7.6安装Kubernetes1.14.1集群
  11. 从 Exadata 到 TiDB,中通快递 HTAP 实践
  12. 博通网卡管理软件Linux,博通网卡管理软件
  13. redis数据结构分析-redisObject-SDS
  14. python显示代码运行时间_python测量代码运行时间方法
  15. 微信小程序rich-text图片不显示及图片过大问题解决办法
  16. Ubuntu系统切换五笔输入法
  17. 安卓Android手机汽车租赁系统app毕业设计
  18. Windows安全机制——UAC(用户权限控制)
  19. 详解 图像旋转变换 原理
  20. 负数与无符号数的转变

热门文章

  1. Linux常用命令——top命令
  2. SQL Server 异常 COM 类公司中CLSID 为 {10021F00-E260-11CF-AE68-00AA004A34D5} 的组件时失败,原因是出现以下错误: 80040154
  3. 蓝背抠像 绿背抠像 算法,实时视频抠像算法 视频直播抠像
  4. shopee一件代发怎么算运费?计算方式是什么?
  5. 编程语言排行榜没有html,TIOBE:2019年12月全球编程语言排行榜
  6. 最好和最便宜的照片存储网站(优质图库摄影)
  7. 2018总结----对共享单车的思考
  8. 三角分解(LU分解)
  9. 光敏电阻与光电二极管的区别
  10. Hive3.1.3 安装配置