Python--DBUtil包

1 简介

DBUtils是一套Python数据库连接池包,并允许对非线程安全的数据库接口进行线程安全包装。DBUtils来自Webware for Python。

DBUtils提供两种外部接口:

  • PersistentDB :提供线程专用的数据库连接,并自动管理连接。
  • PooledDB :提供线程间可共享的数据库连接,并自动管理连接。

实测证明 PersistentDB 的速度是最高的,但是在某些特殊情况下,数据库的连接过程可能异常缓慢,而此时的PooledDB则可以提供相对来说平均连接时间比较短的管理方式。

另外,实际使用的数据库驱动也有所依赖,比如SQLite数据库只能使用PersistentDB作连接池。 下载地址:http://www.webwareforpython.org/downloads/DBUtils/

安装

pip install DBUtils

2 使用方法

连接池对象只初始化一次,一般可以作为模块级代码来确保。 PersistentDB的连接例子:

import DBUtils.PersistentDB
persist=DBUtils.PersistentDB.PersistentDB(dbpai=MySQLdb,maxusage=1000,**kwargs)

这里的参数dbpai指使用的底层数据库模块,兼容DB-API的。maxusage则为一个连接最大使用次数,参考了官方例子。后面的**kwargs则为实际传递给MySQLdb的参数。

获取连接: conn=persist.connection() 实际编程中用过的连接直接关闭 conn.close() 即可将连接交还给连接池。

PooledDB使用方法同PersistentDB,只是参数有所不同。

  • * dbapi :数据库接口,使用链接数据库的模块
  • * mincached :初始化时,链接池中至少创建的空闲的链接,0表示不创建
  • * maxcached :连接池最大可用连接数量
  • * maxshared :连接池最大可共享连接数量
  • * maxconnections : 连接池允许的最大连接数,0和None表示不限制连接数
  • * blocking :达到最大数量时是否阻塞
  • * maxusage :单个连接最大复用次数
  • * setsession :用于传递到数据库的准备会话,如 [”set name UTF-8″] 。

一个使用过程:

import os
import cx_Oracle
# 用于以清晰、可读的形式输出 Python 数据结构
from pprint import pprint
from sys import modules
from DBUtils.PooledDB import PooledDBpool= PooledDB(cx_Oracle,user='test',password='test',dsn='testDB',mincached=5, maxcached=20)
print(pool.connection())
print(connection.version)# 获得游标对象
cursor = pool.connection().cursor ()try:# 解析sql语句cursor.parse("select *  dual")# 捕获SQL异常
except cx_Oracle.DatabaseError as e:print(e)   # ORA-00923: 未找到要求的 FROM 关键字# 执行sql 语句
cursor.execute ("select * from dual")
# 提取一条数据,返回一个元祖
row = cursor.fetchone()
pprint(row)  # ('X',)

3.DBUtil功能

功能

SimplePooledDB
DBUtils.SimplePooledDB 是一个非常简单的数据库连接池实现。他比完善的 PooledDB 模块缺少很多功能。 DBUtils.SimplePooledDB 本质上类似于 MiscUtils.DBPool 这个Webware的组成部分。你可以把它看作一种演示程序。

SteadyDB

DBUtils.SteadyDB 是一个模块实现了”强硬”的数据库连接,基于DB-API 2建立的原始连接。一个”强硬”的连接意味着在连接关闭之后,或者使用次数操作限制时会重新连接。

一个典型的例子是数据库重启时,而你的程序仍然在运行并需要访问数据库,或者当你的程序连接了一个防火墙后面的远程数据库,而防火墙重启时丢失了状态时。

一般来说你不需要直接使用 SteadyDB 它只是给接下来的两个模块提供基本服务, PersistentDB 和 PooledDB 。

PersistentDB

DBUtils.PersistentDB 实现了强硬的、线程安全的、顽固的数据库连接,使用DB-API 2模块。如下图展示了使用 PersistentDB 时的连接层步骤:

persist.gif当一个线程首次打开一个数据库连接时,一个连接会打开并仅供这个线程使用。当线程关闭连接时,连接仍然持续打开供这个线程下次请求时使用这个已经打开的连接。连接在线程死亡时自动关闭。

简单的来说 PersistentDB 尝试重用数据库连接来提高线程化程序的数据库访问性能,并且他确保连接不会被线程之间共享。

因此, PersistentDB 可以在底层DB-API模块并非线程安全的时候同样工作的很好,并且他会在其他线程改变数据库会话或者使用多语句事务时同样避免问题的发生。

PooledDB

DBUtils.PooledDB 实现了一个强硬的、线程安全的、有缓存的、可复用的数据库连接,使用任何DB-API 2模块。如下图展示了使用 PooledDB 时的工作流程:

pool.gif如图所示 PooledDB 可以在不同线程之间共享打开的数据库连接。这在你连接并指定 maxshared 参数,并且底层的DB-API 2接口是线程安全才可以,但是你仍然可以使用专用数据库连接而不在线程之间共享连接。除了共享连接以外,还可以设立一个至少 mincached 的连接池,并且最多允许使用 maxcached 个连接,这可以同时用于专用和共享连接池。当一个线程关闭了一个非共享连接,则会返还到空闲连接池中等待下次使用。

如果底层DB-API模块是非线程安全的,线程锁会确保使用 PooledDB 是线程安全的。所以你并不需要为此担心,但是你在使用专用连接来改变数据库会话或执行多命令事务时必须小心。
该选择哪一个?

PersistentDB 和 PooledDB 都是为了重用数据库连接来提高性能,并保持数据库的稳定性。

所以选择何种模块,可以参考上面的解释。 PersistentDB 将会保持一定数量的连接供频繁使用。在这种情况下你总是保持固定数量的连接。如果你的程序频繁的启动和关闭线程,最好使用 PooledDB 。后面将会提到更好的调整,尤其在使用线程安全的DB-API 2模块时。

当然,这两个模块的接口是很相似的,你可以方便的在他们之间转换,并查看哪个更好一些。

4.使用PooledDB 操作ORALCE数据库案例

settings.py
import os'''
日志文件设置
'''
LOG_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_LEVEL = 'DEBUG'
LOG_FILE = 'ops.log'"""
数据库设置
"""
DB_USER = 'XXX'
DB_PASSWORD = 'XXX'
DB_SID = 'XXX'print(LOG_DIR)

my_logset.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @Time    : 2018/4/23 8:55
# @Author  : hyang
# @File    : my_logset.py
# @Software: PyCharmimport logging
import os
import sys
from logging import handlers
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)  # 加入环境变量from utils import settings# 日志格式
log_format = '[%(asctime)s - %(levelname)s - %(name)s - %(filename)s - %(funcName)s- %(lineno)d ] %(message)s 'def get_mylogger(name):"""get log:param name::return:"""logger = logging.getLogger(name)logger.setLevel(settings.LOG_LEVEL)console_handler = logging.StreamHandler()# 文件绝对路径logfile_path = os.path.join(settings.LOG_DIR, "log", settings.LOG_FILE)if not os.path.exists(logfile_path):# 创建log目录os.mkdir(os.path.join(settings.LOG_DIR, "log"))# 每天创建一个日志文件,文件数不超过20个file_handler = handlers.TimedRotatingFileHandler(logfile_path, when="D", interval=1, backupCount=25)logger.addHandler(console_handler)logger.addHandler(file_handler)file_format = logging.Formatter(fmt=log_format)console_format = logging.Formatter(fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ')console_handler.setFormatter(console_format)file_handler.setFormatter(file_format)return loggerif __name__ == '__main__':log = get_mylogger('access')log.info('access')log.error('Error')#
    # log1 = get_mylogger('trans')# log1.info('trans')

Oracle_util.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @Time    : 2018/5/22 13:17
# @Author  : hyang
# @File    : Oracle_utils.py
# @Software: PyCharm# 用于以清晰、可读的形式输出 Python 数据结构
from sys import modules
import sys
import os
import cx_Oracle
from DBUtils.PooledDB import PooledDBBASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)  # 加入环境变量from utils import settings
from utils import my_logset"""
通过PooledDB连接Oracle,并完成常用一些操作
"""
class Oracle_util(object):__pool = None  # 连接池对象_db_info = {'user': settings.DB_USER,'pwd': settings.DB_PASSWORD,'dsn': settings.DB_SID}def __init__(self, db_info=_db_info, arraysize = 500):# 日志self.db_log = my_logset.get_mylogger("oradb_access")self.db_info = db_infoself.conn = Oracle_util.__getConn(db_info)self.cursor = self.conn.cursor()# 每次从数据库向Python的缓存返回arraysize=100条记录self.cursor.arraysize = arraysize@staticmethoddef __getConn(db_info):# 静态方法,从连接池中取出连接if Oracle_util.__pool is None:__pool = PooledDB(cx_Oracle,user=db_info['user'],password=db_info['pwd'],dsn=db_info['dsn'],mincached=20,maxcached=50)return __pool.connection()def get_columns(self, table):# 查询表的所有列sql = ["select lower(column_name) column_name \from user_tab_columns where table_name=upper('%(table)s')"]rows = self.queryBySql(''.join(sql) % locals())col_list = [k["column_name"] for k in rows]# ['sjhm', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'status']return col_list# 根据表自动创建参数字典def create_params(self, table, args={}):col_list = self.get_columns(table)params = {}for k in col_list:if args.__contains__(k):params[k] = args[k]return params# 执行sqldef execute(self, sql, args={}):try:self.db_log.debug('execute sql:{}'.format(sql))return self.cursor.execute(sql, args)except Exception as e:self.close()raise e# 调用函数 函数名,返回类型, 参数('1',2)元祖类型def callfunc(self, func, ret_type=cx_Oracle.NUMBER, args=()):try:self.db_log.debug('call func:{} {}'.format(func, args))return self.cursor.callfunc(func,ret_type,args)except Exception as e:self.close()raise e# 调用过程 过程名,输入参数('1',2)元祖类型def callproc(self, proc, in_val=()):try:self.db_log.debug('call proc:{} {}'.format(proc,in_val))return self.cursor.callproc(proc, in_val)except Exception as e:self.close()raise e# 解析sqldef parse(self,sql,args={}):try:# 解析sql语句return self.cursor.parse(sql,args)# 捕获SQL异常except Exception as e:self.close()raise e# 批量执行def executemany(self, sql, args):try:self.db_log.debug('executemany sql:{}'.format(sql))return self.cursor.executemany(sql, args)except Exception as e:self.close()raise e# 执行sql,参数一:table,参数二:查询列'col1,col2' 参数三:参数字典{'字段1':'值1','字段2':'值2'}def queryByTable(self, table, column='*',cond_dict={}):# self.execute(sql, args)cond_dict = self.create_params(table, cond_dict)cond_stmt = ' and '.join(['%s=:%s' % (k, k) for k in cond_dict.keys()])# del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s'if not cond_dict:query_sql = 'select %(column)s FROM %(table)s'else:query_sql = 'select %(column)s FROM %(table)s where %(cond_stmt)s'self.execute(query_sql % locals(), cond_dict)return self.get_rows()# 执行sql,参数一:sql语句,如select * from python_modules where module_name=:module_name# 参数二:参数字典{'字段1':'值1','字段2':'值2'} 如{module_name:Oracle}def queryBySql(self, sql, args={}):self.execute(sql, args)return self.get_rows()# 导出结果为文件def exportTxt(self,file_name, sql, args={}, col_split='|', col_flg=True):""":param file_name: 文件位置:param sql:  sql语句 如select module_name,china_name from python_modules where module_name=:module_name:param args:  参数 如{'module_name':'oracle'}:param col_split: 列分隔符:param col_flg: 是否输出列名字段col1|col2:return:"""rt = self.queryBySql(sql, args)if rt:with open(file_name, 'w',encoding="utf-8") as fd:for row in rt:col_info = col_split.join(row.keys())val_info = ''if col_flg:fd.write(col_info+"\n")col_flg = Falseval_info += col_split.join(row.values())val_info += '\n'fd.write(val_info)# 分页查询,参数一:sql语句,参数二:参数字典{'字段1':'值1','字段2':'值2'},参数三:页码,参数四:分页大小def query_pages(self, sql, args={}, page=1, page_size=30):_args, count_args = args, argspage = int(page)# print "page:%s" %(page,)# 下一页next_page = page_size * page# 当前页cur_page = page_size * (page - 1)if page == 1 or cur_page < 0:cur_page = 0next_page = page_sizesql = """SELECT * FROM(SELECT ROWNUM RN,T.* FROM(""" + sql + """)T WHERE ROWNUM<=:next_page)WHERE RN >=:cur_page """count_sql = """SELECT COUNT(1)CNT FROM (""" + sql + """)"""_args["cur_page"] = cur_page_args["next_page"] = next_pagerows = self.queryBySql(sql, _args)countrows = self.queryBySql(count_sql, count_args)return rows, countrows[0]['cnt']# oracle的参数名必须使用:代替,如 userid = :useriddef insertOne(self, table, column_dict):column_dict = self.create_params(table, column_dict)keys = ','.join(column_dict.keys())values = column_dict.values()placeholder = ','.join([':%s' % (v) for v in column_dict.keys()])ins_sql = 'INSERT INTO %(table)s (%(keys)s) VALUES (%(placeholder)s)'# print(ins_sql % locals())self.execute(ins_sql % locals(), column_dict)# 获取序列的下一个值,传入sequence的名称def nextval(self, seq):self.cursor.execute("SELECT %(seq)s.nextval from dual " % locals())result = self.cursor.fetchall()return result[0][0]# 批量插入数据库,参数一:表名,参数二:['字段1','字段2',...],参数二:[('值1','值2',...),('值1','值2',...)]def insertMany(self, table, columns=[], values=[]):keys = ','.join(columns)placeholder = ','.join([':%s' % (v) for v in columns])ins_sql = 'INSERT INTO %(table)s (%(keys)s) VALUES(%(placeholder)s)'self.executemany(ins_sql % locals(), values)return self._get_rows_num()# 更新,参数一:表名,参数二用于set 字段1=值1,字段2=值2...格式:{'字段1':'值1','字段2':'值2'},# 参数三:用于where条件,如 where 字段3=值3 and 字段4=值4,格式{'字段3':'值3','字段4':'值4'}def updateByTable(self, table, column_dict={}, cond_dict={}):column_dict = self.create_params(table, column_dict)cond_dict = self.create_params(table, cond_dict)set_stmt = ','.join(['%s=:%s' % (k, k) for k in column_dict.keys()])cond_stmt = ' and '.join(['%s=:%s' % (k, k) for k in cond_dict.keys()])if not cond_dict:upd_sql = 'UPDATE %(table)s set %(set_stmt)s'else:upd_sql = 'UPDATE %(table)s set %(set_stmt)s where %(cond_stmt)s'args = dict(column_dict, **cond_dict)  # 合并成1个self.execute(upd_sql % locals(), args)return self._get_rows_num()# 删除,参数一:表名,#参数二:用于where条件,如 where 字段3=值3 and 字段4=值4,格式{'字段3':'值3','字段4':'值4'}def deleteByTable(self, table, cond_dict={}):cond_dict = self.create_params(table, cond_dict)cond_stmt = ' and '.join(['%s=:%s' % (k, k) for k in cond_dict.keys()])# del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s'if not cond_dict:del_sql = 'DELETE FROM %(table)s'else:del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s'self.execute(del_sql % locals(), cond_dict)return self._get_rows_num()# 提取数据,参数一提取的记录数,参数二,是否以字典方式提取。为true时返回:{'字段1':'值1','字段2':'值2'}def get_rows(self, size=None, is_dict=True):if size is None:rows = self.cursor.fetchall()else:rows = self.cursor.fetchmany(size)if rows is None:rows = []if is_dict:dict_rows = []dict_keys = [r[0].lower() for r in self.cursor.description]for row in rows:dict_rows.append(dict(zip(dict_keys, row)))rows = dict_rowsreturn rows# 获取更改记录数def _get_rows_num(self):return self.cursor.rowcount# 提交def commit(self):self.conn.commit()# 回滚def rollback(self):self.conn.rollback();# 销毁def __del__(self):self.close()# 关闭连接def close(self):# self.commit()
        self.cursor.close()self.conn.close()if __name__ == '__main__':# exampleora = Oracle_util()create_table = """CREATE TABLE python_modules (module_name VARCHAR2(50) NOT NULL,file_path VARCHAR2(300) NOT NULL,china_name VARCHAR2(300) NOT NULL)"""# 执行创建表create_flag = ora.execute(create_table)# 得到表所有列print(ora.get_columns('python_modules'))# 添加模块信息M = []for m_name, m_info in modules.items():try:M.append((m_name, m_info.__file__, '中国'))except AttributeError:passprint(len(M))print(ora.insertMany('python_modules',['module_name', 'file_path','china_name'],M))ora.commit()print(ora.queryBySql(sql="select * from python_modules where module_name=:module_name", args={'module_name':'DBUtils.PooledDB'}))print(ora.updateByTable(table='python_modules',column_dict={'china_name':'北京'},cond_dict={'module_name':'DBUtils.PooledDB'}))ora.commit()print(ora.queryBySql(sql="select * from python_modules where module_name=:module_name", args={'module_name':'DBUtils.PooledDB'}))print(ora.deleteByTable(table='python_modules', cond_dict={'module_name': 'DBUtils.PooledDB'}))ora.commit()print(ora.queryBySql(sql="select module_name,china_name from python_modules where module_name=:module_name", args={'module_name':'DBUtils.PooledDB'}))ora.updateByTable(table='python_modules', column_dict={'china_name': '河北'})ora.commit()ora.exportTxt("a.txt", sql="select * from python_modules", )print(ora.deleteByTable(table='python_modules'))ora.commit()print(ora.queryByTable(table="python_modules"))ora.execute("DROP TABLE python_modules PURGE")print(ora.callfunc('myfunc', cx_Oracle.NUMBER, ('abc', 2)))print(ora.callproc('myproc',  (3,)))print(ora.queryByTable(table="ptab",column='mydata, myid',cond_dict={'myid':2}))

转载于:https://www.cnblogs.com/xiao-apple36/p/9071553.html

Python--DBUtil相关推荐

  1. Python DbUtil操作数据

    1.DbUtil操作类 #!/usr/bin/python import pymysql; ''' 数据库操作类 ''' class DbUtil: dbconnect = None; hostnam ...

  2. python mysql dbutil_python环境测试MySQLdb、DBUtil、sqlobject性能

    首先介绍下MySQLdb.DBUtil.sqlobject: (1)MySQLdb 是用于Python连接Mysql数据库的接口,它实现了 Python 数据库 API 规范 V2.0,基于 MySQ ...

  3. Python的scrapy之爬取顶点小说网的所有小说

    闲来无事用Python的scrapy框架练练手,爬取顶点小说网的所有小说的详细信息. 看一下网页的构造: tr标签里面的 td 使我们所要爬取的信息 下面是我们要爬取的二级页面 小说的简介信息: 下面 ...

  4. python之数据库连接

    概述 前几日,闲来无事,写了一个python来连接数据库的模板案例,仿照传统MVC的例子.代码如下: DBUtil import pymysqlclass DBUtil:__db = None__ob ...

  5. 【源码阅读】dbutil包中BasicRowProcessor内部类CaseInsensiti...

    2019独角兽企业重金招聘Python工程师标准>>> 本文就是对我问的问题:http://www.oschina.net/question/1029535_127629?p=1#A ...

  6. python 监听键盘输入并收集数据进行分析

    文章目录 1.键盘监听库pynput 2.进行键盘输入的数据保存 3.从键盘监听中我们能看出什么 3.1 疯狂的ctrl=读代码ing 3.2 看起来有意义的字符串并以Key.enter结尾≈输入某个 ...

  7. 【视频教程免费领取】聚焦Python分布式爬虫必学框架Scrapy 打造搜索引擎

    领取方式 关注公众号,发送Python0407获取下载链接. 扫码关注公众号,公众号回复 Python0407 获取下载地址 目录结构 目录:/读书ReadBook [57.6G] ┣━━48G全套J ...

  8. 用Python语言绘制股市OBV指标效果

    我的新书<基于股票大数据分析的Python入门实战>于近日上架,在这篇博文向大家介绍我的新书:<基于股票大数据分析的Python入门实战>里,介绍了这本书的内容.这里将摘录出部 ...

  9. python oracle 工具类,python连接Oracle工具类

    上代码: # -*- coding:utf-8 -*- import cx_Oracle import pandas as pd class ORACLE(object): def __init__( ...

  10. python生成零矩阵_python 实现矩阵填充0的例子

    python 实现矩阵填充0的例子 需求: 原矩阵 [[1 2 3] [4 5 6] [7 8 9]] 在原矩阵元素之间填充元素 0,得到 [[1. 0. 2. 0. 3.] [0. 0. 0. 0. ...

最新文章

  1. 自动化运维之CentOS7下PXE+Kickstart+DHCP+TFTP+HTTP无人值守安装系统
  2. 架构师之路 — 业务架构 — Overview
  3. 异常:catch下的return;
  4. gethostbyname()函数:通过域名获取IP地址
  5. k8s 更改pod数量限制(默认每个节点最多110组pod)0/3 nodes are available: 3 Insufficient cpu报错排查
  6. 隐藏马尔科夫模型HMM
  7. FAT和EXFAT文件系统
  8. ETL异构数据源Datax_使用querySql_08
  9. AspxTreeList获取选中项的值
  10. Angular 分页
  11. 解决方案-电子签章:金格科技
  12. 摄氏度和开氏度的换算_开尔文与摄氏度的换算关系
  13. React Native Android 8081端口占用问题
  14. 【其他工具】亲戚关系计算器
  15. 网站监控程序uptime-kuma,宝塔面板搭建 ,TCP/HTTP监控
  16. [Windows系统]查看电脑开关机时间
  17. 用计算机制作演示文稿教案博客,《制作多媒体演示文稿》教学案例
  18. 打造数字化转型IT生态系统的IT管理方法:IT4IT一瞥
  19. ViewPager(一屏多页、无限滑动、自动切换)
  20. 20190311 Windows上ZooKeeper伪集群的实现

热门文章

  1. 阮一峰:jQuery官方基础教程笔记
  2. Spring注解源码分析
  3. [pytorch、学习] - 5.2 填充和步幅
  4. ES5-5 参数默认值、递归、预编译、暗示全局变量
  5. Java Web应用的生命周期
  6. HP P2000 RAID-5两块盘离线的数据恢复报告
  7. 将团队迁移到可视化项目管理软件
  8. PHP自动测试框架Top 10
  9. SQLite轻量级数据库,操作数据常用语句
  10. 20100519 学习记录:asp CreateFolder/上传附件