#!/usr/bin/env python
# -*- coding: utf-8 -*-"""
设计db模块的原因:1. 更简单的操作数据库一次数据访问:   数据库连接 => 游标对象 => 执行SQL => 处理异常 => 清理资源。db模块对这些过程进行封装,使得用户仅需关注SQL执行。2. 数据安全用户请求以多线程处理时,为了避免多线程下的数据共享引起的数据混乱,需要将数据连接以ThreadLocal对象传入。
设计db接口:1.设计原则:根据上层调用者设计简单易用的API接口2. 调用接口1. 初始化数据库连接信息create_engine封装了如下功能:1. 为数据库连接 准备需要的配置信息2. 创建数据库连接(由生成的全局对象engine的 connect方法提供)from transwarp import dbdb.create_engine(user='root',password='password',database='test',host='127.0.0.1',port=3306)2. 执行SQL DMLselect 函数封装了如下功能:1.支持一个数据库连接里执行多个SQL语句2.支持链接的自动获取和释放使用样例:users = db.select('select * from user')# users =># [#     { "id": 1, "name": "Michael"},#     { "id": 2, "name": "Bob"},#     { "id": 3, "name": "Adam"}# ]3. 支持事物transaction 函数封装了如下功能:1. 事务也可以嵌套,内层事务会自动合并到外层事务中,这种事务模型足够满足99%的需求
"""import time
import uuid
import functools
import threading
import logging# global engine object:
engine = Nonedef next_id(t=None):"""生成一个唯一id   由 当前时间 + 随机数(由伪随机数得来)拼接得到"""if t is None:t = time.time()return '%015d%s000' % (int(t * 1000), uuid.uuid4().hex)def _profiling(start, sql=''):"""用于剖析sql的执行时间"""t = time.time() - startif t > 0.1:logging.warning('[PROFILING] [DB] %s: %s' % (t, sql))else:logging.info('[PROFILING] [DB] %s: %s' % (t, sql))def create_engine(user, password, database, host='127.0.0.1', port=3306, **kw):"""db模型的核心函数,用于连接数据库, 生成全局对象engine,engine对象持有数据库连接"""import mysql.connectorglobal engineif engine is not None:raise DBError('Engine is already initialized.')params = dict(user=user, password=password, database=database, host=host, port=port)defaults = dict(use_unicode=True, charset='utf8', collation='utf8_general_ci', autocommit=False)for k, v in defaults.iteritems():params[k] = kw.pop(k, v)params.update(kw)params['buffered'] = Trueengine = _Engine(lambda: mysql.connector.connect(**params))# test connection...logging.info('Init mysql engine <%s> ok.' % hex(id(engine)))def connection():"""db模块核心函数,用于获取一个数据库连接通过_ConnectionCtx对 _db_ctx封装,使得惰性连接可以自动获取和释放,也就是可以使用 with语法来处理数据库连接_ConnectionCtx    实现with语法^|_db_ctx           _DbCtx实例^|_DbCtx            获取和释放惰性连接^|_LasyConnection   实现惰性连接"""return _ConnectionCtx()def with_connection(func):"""设计一个装饰器 替换with语法,让代码更优雅比如:@with_connectiondef foo(*args, **kw):f1()f2()f3()"""@functools.wraps(func)def _wrapper(*args, **kw):with _ConnectionCtx():return func(*args, **kw)return _wrapperdef transaction():"""db模块核心函数 用于实现事物功能支持事物:with db.transaction():db.select('...')db.update('...')db.update('...')支持事物嵌套:with db.transaction():transaction1transaction2..."""return _TransactionCtx()def with_transaction(func):"""设计一个装饰器 替换with语法,让代码更优雅比如:@with_transactiondef do_in_transaction():>>> @with_transaction... def update_profile(id, name, rollback):...     u = dict(id=id, name=name, email='%s@test.org' % name, passwd=name, last_modified=time.time())...     insert('user', **u)...     update('update user set passwd=? where id=?', name.upper(), id)...     if rollback:...         raise StandardError('will cause rollback...')>>> update_profile(8080, 'Julia', False)>>> select_one('select * from user where id=?', 8080).passwdu'JULIA'>>> update_profile(9090, 'Robert', True)Traceback (most recent call last):...StandardError: will cause rollback..."""@functools.wraps(func)def _wrapper(*args, **kw):start = time.time()with _TransactionCtx():func(*args, **kw)_profiling(start)return _wrapper@with_connection
def _select(sql, first, *args):"""执行SQL,返回一个结果 或者多个结果组成的列表"""global _db_ctxcursor = Nonesql = sql.replace('?', '%s')logging.info('SQL: %s, ARGS: %s' % (sql, args))try:cursor = _db_ctx.connection.cursor()cursor.execute(sql, args)if cursor.description:names = [x[0] for x in cursor.description]if first:values = cursor.fetchone()if not values:return Nonereturn Dict(names, values)return [Dict(names, x) for x in cursor.fetchall()]finally:if cursor:cursor.close()def select_one(sql, *args):"""执行SQL 仅返回一个结果如果没有结果 返回None如果有1个结果,返回一个结果如果有多个结果,返回第一个结果>>> u1 = dict(id=100, name='Alice', email='alice@test.org', passwd='ABC-12345', last_modified=time.time())>>> u2 = dict(id=101, name='Sarah', email='sarah@test.org', passwd='ABC-12345', last_modified=time.time())>>> insert('user', **u1)1>>> insert('user', **u2)1>>> u = select_one('select * from user where id=?', 100)>>> u.nameu'Alice'>>> select_one('select * from user where email=?', 'abc@email.com')>>> u2 = select_one('select * from user where passwd=? order by email', 'ABC-12345')>>> u2.nameu'Alice'"""return _select(sql, True, *args)def select_int(sql, *args):"""执行一个sql 返回一个数值,注意仅一个数值,如果返回多个数值将触发异常>>> u1 = dict(id=96900, name='Ada', email='ada@test.org', passwd='A-12345', last_modified=time.time())>>> u2 = dict(id=96901, name='Adam', email='adam@test.org', passwd='A-12345', last_modified=time.time())>>> insert('user', **u1)1>>> insert('user', **u2)1>>> select_int('select count(*) from user')5>>> select_int('select count(*) from user where email=?', 'ada@test.org')1>>> select_int('select count(*) from user where email=?', 'notexist@test.org')0>>> select_int('select id from user where email=?', 'ada@test.org')96900>>> select_int('select id, name from user where email=?', 'ada@test.org')Traceback (most recent call last):...MultiColumnsError: Expect only one column."""d = _select(sql, True, *args)if len(d) != 1:raise MultiColumnsError('Expect only one column.')return d.values()[0]def select(sql, *args):"""执行sql 以列表形式返回结果>>> u1 = dict(id=200, name='Wall.E', email='wall.e@test.org', passwd='back-to-earth', last_modified=time.time())>>> u2 = dict(id=201, name='Eva', email='eva@test.org', passwd='back-to-earth', last_modified=time.time())>>> insert('user', **u1)1>>> insert('user', **u2)1>>> L = select('select * from user where id=?', 900900900)>>> L[]>>> L = select('select * from user where id=?', 200)>>> L[0].emailu'wall.e@test.org'>>> L = select('select * from user where passwd=? order by id desc', 'back-to-earth')>>> L[0].nameu'Eva'>>> L[1].nameu'Wall.E'"""return _select(sql, False, *args)@with_connection
def _update(sql, *args):"""执行update 语句,返回update的行数"""global _db_ctxcursor = Nonesql = sql.replace('?', '%s')logging.info('SQL: %s, ARGS: %s' % (sql, args))try:cursor = _db_ctx.connection.cursor()cursor.execute(sql, args)r = cursor.rowcountif _db_ctx.transactions == 0:# no transaction enviroment:logging.info('auto commit')_db_ctx.connection.commit()return rfinally:if cursor:cursor.close()def update(sql, *args):"""执行update 语句,返回update的行数>>> u1 = dict(id=1000, name='Michael', email='michael@test.org', passwd='123456', last_modified=time.time())>>> insert('user', **u1)1>>> u2 = select_one('select * from user where id=?', 1000)>>> u2.emailu'michael@test.org'>>> u2.passwdu'123456'>>> update('update user set email=?, passwd=? where id=?', 'michael@example.org', '654321', 1000)1>>> u3 = select_one('select * from user where id=?', 1000)>>> u3.emailu'michael@example.org'>>> u3.passwdu'654321'>>> update('update user set passwd=? where id=?', '***', '123')0"""return _update(sql, *args)def insert(table, **kw):"""执行insert语句>>> u1 = dict(id=2000, name='Bob', email='bob@test.org', passwd='bobobob', last_modified=time.time())>>> insert('user', **u1)1>>> u2 = select_one('select * from user where id=?', 2000)>>> u2.nameu'Bob'>>> insert('user', **u2)Traceback (most recent call last):...IntegrityError: 1062 (23000): Duplicate entry '2000' for key 'PRIMARY'"""cols, args = zip(*kw.iteritems())sql = 'insert into `%s` (%s) values (%s)' % (table, ','.join(['`%s`' % col for col in cols]), ','.join(['?' for i in range(len(cols))]))return _update(sql, *args)class Dict(dict):"""字典对象实现一个简单的可以通过属性访问的字典,比如 x.key = value"""def __init__(self, names=(), values=(), **kw):super(Dict, self).__init__(**kw)for k, v in zip(names, values):self[k] = vdef __getattr__(self, key):try:return self[key]except KeyError:raise AttributeError(r"'Dict' object has no attribute '%s'" % key)def __setattr__(self, key, value):self[key] = valueclass DBError(Exception):passclass MultiColumnsError(DBError):passclass _Engine(object):"""数据库引擎对象用于保存 db模块的核心函数:create_engine 创建出来的数据库连接"""def __init__(self, connect):self._connect = connectdef connect(self):return self._connect()class _LasyConnection(object):"""惰性连接对象仅当需要cursor对象时,才连接数据库,获取连接"""def __init__(self):self.connection = Nonedef cursor(self):if self.connection is None:_connection = engine.connect()logging.info('[CONNECTION] [OPEN] connection <%s>...' % hex(id(_connection)))self.connection = _connectionreturn self.connection.cursor()def commit(self):self.connection.commit()def rollback(self):self.connection.rollback()def cleanup(self):if self.connection:_connection = self.connectionself.connection = Nonelogging.info('[CONNECTION] [CLOSE] connection <%s>...' % hex(id(connection)))_connection.close()class _DbCtx(threading.local):"""db模块的核心对象, 数据库连接的上下文对象,负责从数据库获取和释放连接取得的连接是惰性连接对象,因此只有调用cursor对象时,才会真正获取数据库连接该对象是一个 Thread local对象,因此绑定在此对象上的数据 仅对本线程可见"""def __init__(self):self.connection = Noneself.transactions = 0def is_init(self):"""返回一个布尔值,用于判断 此对象的初始化状态"""return self.connection is not Nonedef init(self):"""初始化连接的上下文对象,获得一个惰性连接对象"""logging.info('open lazy connection...')self.connection = _LasyConnection()self.transactions = 0def cleanup(self):"""清理连接对象,关闭连接"""self.connection.cleanup()self.connection = Nonedef cursor(self):"""获取cursor对象, 真正取得数据库连接"""return self.connection.cursor()# thread-local db context:
_db_ctx = _DbCtx()class _ConnectionCtx(object):"""因为_DbCtx实现了连接的 获取和释放,但是并没有实现连接的自动获取和释放,_ConnectCtx在 _DbCtx基础上实现了该功能,因此可以对 _ConnectCtx 使用with 语法,比如:with connection():passwith connection():pass"""def __enter__(self):"""获取一个惰性连接对象"""global _db_ctxself.should_cleanup = Falseif not _db_ctx.is_init():_db_ctx.init()self.should_cleanup = Truereturn selfdef __exit__(self, exctype, excvalue, traceback):"""释放连接"""global _db_ctxif self.should_cleanup:_db_ctx.cleanup()class _TransactionCtx(object):"""事务嵌套比Connection嵌套复杂一点,因为事务嵌套需要计数,每遇到一层嵌套就+1,离开一层嵌套就-1,最后到0时提交事务"""def __enter__(self):global _db_ctxself.should_close_conn = Falseif not _db_ctx.is_init():# needs open a connection first:_db_ctx.init()self.should_close_conn = True_db_ctx.transactions += 1logging.info('begin transaction...' if _db_ctx.transactions == 1 else 'join current transaction...')return selfdef __exit__(self, exctype, excvalue, traceback):global _db_ctx_db_ctx.transactions -= 1try:if _db_ctx.transactions == 0:if exctype is None:self.commit()else:self.rollback()finally:if self.should_close_conn:_db_ctx.cleanup()def commit(self):global _db_ctxlogging.info('commit transaction...')try:_db_ctx.connection.commit()logging.info('commit ok.')except:logging.warning('commit failed. try rollback...')_db_ctx.connection.rollback()logging.warning('rollback ok.')raisedef rollback(self):global _db_ctxlogging.warning('rollback transaction...')_db_ctx.connection.rollback()logging.info('rollback ok.')if __name__ == '__main__':logging.basicConfig(level=logging.DEBUG)create_engine('www-data', 'www-data', 'test', '192.168.10.128')update('drop table if exists user')update('create table user (id int primary key, name text, email text, passwd text, last_modified real)')import doctestdoctest.testmod()
												

python下的mysql模块包装相关推荐

  1. python学习之-- mysql模块和sqlalchemy模块

    简单介绍python下操作mysql数据库模块有2个:pyhton-mysqldb  和 pymysql 说明:在python3中支持mysql 的模块已经使用pymysql替代了MysqlDB(这个 ...

  2. Python下使用tarfile模块来实现文件归档压缩与解压

    Python下使用tarfile模块来实现文件归档压缩与解压   部分转自:http://www.diybl.com/course/3_program/python/20110510/555345.h ...

  3. Python下使用optparse模块实现对多个文件进行统计【二】

    一个取代shell wc -l 命令的python小脚本 1.通过python下optparse模块下OptionParser类是新对文件的统计 #!/opt/data/ipy/bin/python ...

  4. python下的spectral模块(高光谱图像处理)

    Spectral Python (SPy)是一个用于处理高光谱图像数据的纯Python模块.它具有读取.显示.操作和分类高光谱图像的功能. 之所以用它是因为这个对多波段图像的支持更好 参考 一.SPy ...

  5. python下保持mysql连接,避免“MySQL server has gone away“方法

    因需要对saltstack的所有动作进行入库采集,网上采集脚本mysql连接会因超时而断开,导致守护进程在下一次采集数据时提示: Traceback (most recent call last):F ...

  6. Python下安装Pywifi进行WiFi密码破解

    Python下安装Pywifi进行WiFi密码破解 能成功的在Python下安装Pywifi模块的方法及软件 前段时间在网上看了一些关于Python下安装Pywifi模块的方法,很多都是安装不成功的, ...

  7. python下tkinter模块和mysql构建图书管理系统实验

    前言与准备 本次实验是大二数据库期末大作业,我采用的是python语言和mysql8.0.27制作的基于cs构架的数据库框架,由于之前从未了解过python语言,所以代码部分语言较为粗暴,未成年人请在 ...

  8. Python调用MySQL模块初试

    学Python喊了很长时间了,总是因为各种各样的理由搁置,昨天想起来前同事推荐过一本Python的书<Python核心编程>第二版,就火速买了一本,Python的学习也算是个开始了. 当然 ...

  9. liunx mysql模块_linux下安装MySQLdb模块_MySQL

    bitsCN.com linux下安装MySQLdb模块 1,查看是否已安装MySQLdb模块 进入python的命令行,输入 import MySQLdb 如果没有报错,证明此模块已经安装,可以跳过 ...

最新文章

  1. python 读取csv文件生成散点图
  2. 对于位置无关代码的理解
  3. SpringMVC学习(四)——Spring使用到的设计模式分析
  4. JPA的entityManager的find、getReference、persisit、remove方法的使用
  5. Android Treble 计划技术文档
  6. 云点播网页版_微软宣布:免费开放微软云办公、云桌面、云远程方案
  7. 理解Joomla!模板
  8. hashmap clone_Java HashMap clone()方法与示例
  9. c语言超长整数加法计算,两个超长正整数的加法
  10. CTF SQL注入知识点
  11. Python __init__.py 作用详解
  12. icem密度盒怎么设置_ICEM-自动体网格生成[精].ppt
  13. 转载——傅里叶变换概念及公式推导
  14. AIR应用:二维码批量生成
  15. nginx_centos
  16. Rootkit技术基础(4)
  17. EDU教育版Office365使用教程(二):桌面版Office365下载安装
  18. deepin RTX2060 GTX1050 分辨率 1024 x 768 驱动
  19. 摩拜ofo补贴战熄火 月卡大涨价网店打折卖
  20. 『津津乐道播客』世界艾滋病日话题征集

热门文章

  1. 一个不知名的网站复制来的: java怎样连接到SQL server 2008
  2. 一个API接口的例子,包括单元测试
  3. WebService技术详解CXF
  4. mysql主键unsigned_mysql – 主键应该总是unsigned?
  5. java中的关键字有哪些_Java关键字有哪些?
  6. 2021年广西艺术高考成绩查询,2021年广西美术高考成绩查询网址:https://www.gxeea.cn/...
  7. java 数组 截取_Java成长孵化园---认识java(day09)
  8. MySQL————表维护相关低频操作总结
  9. 个人生活助手app_美居App 6版重磅升级 开启智慧生活新范式
  10. html 自定义打印模板,HTML+CSS入门 自定义模板详解