#!/usr/bin/env python3
# -*- coding: utf-8 -*-#
#                        Python3 ORM hacking
# 说明:
#     之前分析了一个Python2 ORM的源代码,这次分析一个Python3的源代码,在写法上
# 还是又挺大的区别的。#                                        2016-10-22 深圳 南山平山村 曾剑锋
#
# 源码:
#     https://github.com/michaelliao/awesome-python3-webapp/tree/day-03
#
# 参考文章:
#     1. python logging模块使用教程
#         http://www.jianshu.com/p/feb86c06c4f4
#     2. Python async/await入门
#         https://ipfans.github.io/2015/08/introduction-to-async-and-await/
#     3. 浅析python的metaclass
#         http://jianpx.iteye.com/blog/908121
#     4. Why I got ignored exception when I use aiomysql in python 3.5 #59
#         https://github.com/aio-libs/aiomysql/issues/59
# __author__ = 'Michael Liao'import asyncio, loggingimport aiomysql# SQL日志打印输出模板
def log(sql, args=()):logging.info('SQL: %s' % sql)# 创建数据库连接池
async def create_pool(loop, **kw):logging.info('create database connection pool...')# 标记__pool为文件内全局变量,在其他函数内可以直接访问global __pool__pool = await aiomysql.create_pool(host=kw.get('host', 'localhost'),port=kw.get('port', 3306),user=kw['user'],password=kw['password'],db=kw['db'],charset=kw.get('charset', 'utf8'),autocommit=kw.get('autocommit', True),maxsize=kw.get('maxsize', 10),minsize=kw.get('minsize', 1),loop=loop)# 数据库查询
async def select(sql, args, size=None):# 输出SQL日志信息log(sql, args)global __pool# 从连接池中获取连接,aysnc是异步获取连接async with __pool.get() as conn:async with conn.cursor(aiomysql.DictCursor) as cur:# 合成实际的SQLawait cur.execute(sql.replace('?', '%s'), args or ())# 根据size来获取数据多少行记录if size:rs = await cur.fetchmany(size)else:rs = await cur.fetchall()# 给出获取到的信息条数logging.info('rows returned: %s' % len(rs))return rs# 数据库直接执行SQL
async def execute(sql, args, autocommit=True):log(sql)async with __pool.get() as conn:# 如果不是自动提交if not autocommit:await conn.begin()try:async with conn.cursor(aiomysql.DictCursor) as cur:await cur.execute(sql.replace('?', '%s'), args)# 返回的执行SQL后有效行数,从代码上可以看出,这部分主要是执行更新、插入、删除等SQL语句affected = cur.rowcount# 完成提交工作if not autocommit:await conn.commit()except BaseException as e:# 出现问题,回滚if not autocommit:await conn.rollback()raise       # 直接再次抛出异常return affected # 返回有效行数# 合成可替代参数字符串,先使用'?'代替'%s'
def create_args_string(num):L = []for n in range(num):L.append('?')return ', '.join(L)# 对应数据库中每一个字段的一个域的基类
class Field(object):# 域名、域类型、是否是主键、默认值def __init__(self, name, column_type, primary_key, default):self.name = nameself.column_type = column_typeself.primary_key = primary_keyself.default = default# 重写默认输出的str函数def __str__(self):return '<%s, %s:%s>' % (self.__class__.__name__, self.column_type, self.name)# 字符串类型的域
class StringField(Field):def __init__(self, name=None, primary_key=False, default=None, ddl='varchar(100)'):super().__init__(name, ddl, primary_key, default)# Boolean类型的域
class BooleanField(Field):def __init__(self, name=None, default=False):super().__init__(name, 'boolean', False, default)# 整形类型的域
class IntegerField(Field):def __init__(self, name=None, primary_key=False, default=0):super().__init__(name, 'bigint', primary_key, default)# 浮点类型的域
class FloatField(Field):def __init__(self, name=None, primary_key=False, default=0.0):super().__init__(name, 'real', primary_key, default)# 文本类型的域
class TextField(Field):def __init__(self, name=None, default=None):super().__init__(name, 'text', False, default)# MVC中的Model的元类,主要用于自动生成映射(map)类
class ModelMetaclass(type):# name: 类的名字# bases: 基类,通常是tuple类型# attrs: dict类型,就是类的属性或者函数def __new__(cls, name, bases, attrs):# 过滤掉Model类直接生成的实例类if name=='Model':return type.__new__(cls, name, bases, attrs)# 从类的属性中获取__table__,其实也就是于数据库对应的表名,如果不存在那么就是等于类名tableName = attrs.get('__table__', None) or namelogging.info('found model: %s (table: %s)' % (name, tableName))# 创建映射字典mappings = dict()# 域listfields = []# 主键标记primaryKey = None# 获取类中的所有的键值对for k, v in attrs.items():# 选择Field类型实例的属性作为映射键值if isinstance(v, Field):logging.info('  found mapping: %s ==> %s' % (k, v))# 将当前的键值对放入mapping中mappings[k] = vif v.primary_key:# 防止出现两个、两个以上的主键if primaryKey:raise StandardError('Duplicate primary key for field: %s' % k)primaryKey = kelse:# 将key添加进入fields中,也就是映射类中的属性和数据库中的表的域, 这里面不包含主键fields.append(k)# 前面可能没有找到主键,提示一下if not primaryKey:raise StandardError('Primary key not found.')# 删除这些类属性 防止访问实例属性的时候发生错误,因为实例属性优先级大于类属性for k in mappings.keys():attrs.pop(k)escaped_fields = list(map(lambda f: '`%s`' % f, fields))attrs['__mappings__'] = mappings        # 保存属性和列的映射关系attrs['__table__'] = tableName          # 表名attrs['__primary_key__'] = primaryKey   # 主键属性名attrs['__fields__'] = fields            # 除主键外的属性名# 生成查询SQLattrs['__select__'] = 'select `%s`, %s from `%s`' % (primaryKey, ', '.join(escaped_fields), tableName)# 生成插入SQLattrs['__insert__'] = 'insert into `%s` (%s, `%s`) values (%s)' % (tableName, ', '.join(escaped_fields), primaryKey, create_args_string(len(escaped_fields) + 1))# 生成更新SQLattrs['__update__'] = 'update `%s` set %s where `%s`=?' % (tableName, ', '.join(map(lambda f: '`%s`=?' % (mappings.get(f).name or f), fields)), primaryKey)# 生成删除SQLattrs['__delete__'] = 'delete from `%s` where `%s`=?' % (tableName, primaryKey)# 调用type生成类return type.__new__(cls, name, bases, attrs)# 继承自ModelMetaclass元类、dict的类
class Model(dict, metaclass=ModelMetaclass):def __init__(self, **kw):super(Model, self).__init__(**kw)# 重写get方法def __getattr__(self, key):try:return self[key]except KeyError:raise AttributeError(r"'Model' object has no attribute '%s'" % key)# 重写set方法def __setattr__(self, key, value):self[key] = value# 重写get方法def getValue(self, key):return getattr(self, key, None)# 获取值,当不存在的时候获取的是默认值def getValueOrDefault(self, key):value = getattr(self, key, None)if value is None:field = self.__mappings__[key]if field.default is not None:value = field.default() if callable(field.default) else field.defaultlogging.debug('using default value for %s: %s' % (key, str(value)))setattr(self, key, value)return value@classmethodasync def findAll(cls, where=None, args=None, **kw):' find objects by where clause. '# 获取元类自动生成的SQL语句,并根据当前的参数,继续合成sql = [cls.__select__]if where:sql.append('where')sql.append(where)if args is None:args = []orderBy = kw.get('orderBy', None)if orderBy:sql.append('order by')sql.append(orderBy)limit = kw.get('limit', None)if limit is not None:sql.append('limit')if isinstance(limit, int):sql.append('?')args.append(limit)elif isinstance(limit, tuple) and len(limit) == 2:sql.append('?, ?')args.extend(limit)else:raise ValueError('Invalid limit value: %s' % str(limit))# 直接调用select函数来处理, 这里是等待函数执行完成函数才能返回rs = await select(' '.join(sql), args)# 该类本身是字典,自己用自己生成新的实例,里面的阈值正好也是需要查询return [cls(**r) for r in rs]@classmethodasync def findNumber(cls, selectField, where=None, args=None):' find number by select and where. '# 这里的 _num_ 什么意思?别名? 我估计是mysql里面一个记录实时查询结果条数的变量sql = ['select %s _num_ from `%s`' % (selectField, cls.__table__)]if where:sql.append('where')sql.append(where)rs = await select(' '.join(sql), args, 1)if len(rs) == 0:return Nonereturn rs[0]['_num_']@classmethodasync def find(cls, pk):' find object by primary key. '# 通过主键查找对象, 如果不存在,那么就返回Noners = await select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1)if len(rs) == 0:return Nonereturn cls(**rs[0])# 插入语句对应的方法async def save(self):args = list(map(self.getValueOrDefault, self.__fields__))args.append(self.getValueOrDefault(self.__primary_key__))rows = await execute(self.__insert__, args)if rows != 1:logging.warn('failed to insert record: affected rows: %s' % rows)# 更新语句对应的方法async def update(self):args = list(map(self.getValue, self.__fields__))args.append(self.getValue(self.__primary_key__))rows = await execute(self.__update__, args)if rows != 1:logging.warn('failed to update by primary key: affected rows: %s' % rows)# 删除语句对应的方法async def remove(self):args = [self.getValue(self.__primary_key__)]rows = await execute(self.__delete__, args)if rows != 1:logging.warn('failed to remove by primary key: affected rows: %s' % rows)

Python3 ORM hacking相关推荐

  1. oracle视图如何创建索引,ORACLE 创建视图索引序列

    /* 视图View 视图是从若干基本表和(或)其他视图构造出来的表 视图存放的都是查询语句,并没有真实的数据 虚表 作用 限制对数据的操作 复杂查询变简单 提供相同数据的不同显示 UNION ALL ...

  2. python3数据库框架_Python3 MySQL 数据库连接:安装pymysql(mysql数据库驱动), sqlalchemy(ORM框架)。...

    Python3 MySQL 数据库连接 python3使用mysql作为数据库,安装pymysql作为驱动,然后安装sqlalchemy框架 PyMySQL 驱动 安装: $ python3 -m p ...

  3. python3 数据库操作 orm sqlalchemy 简介

    ORM 全称 Object Relational Mapping, 翻译过来叫对象关系映射.简单的说,ORM 将数据库中的表与面向对象语言中的类建立了一种对应关系.这样,我们要操作数据库,数据库中的表 ...

  4. python数据库框架_Python数据库及ORM框架对比选择

    使用Python进行MySQL的库主要有三个: Python-MySQL(更熟悉的名字可能是MySQLdb), PyMySQL SQLAlchemy. Python-MySQL: 资格最老,核心由C语 ...

  5. python代码创建数据库_如何使用python ORM创建数据库表?

    首先同大家说了语言的全方面知识,基本上各个位置点都有涉及,不知道大家有没有学到知识点呢?小编还是习惯跟大家说个总结,这样大家才能抓住重点,今天继续来学习下关于Django框架中ORM的使用,主要的作用 ...

  6. python安装orm_Python ORM框架之 Peewee入门

    之前在学Django时,发现它的模型层非常好用,把对数据库的操作映射成对类.对象的操作,避免了我们直接写在Web项目中SQL语句,当时想,如果这个模型层可以独立出来使用就好了,那我们平台操作数据库也可 ...

  7. Django之初步实现登录功能,APP及ORM

    form表单提交数据注意事项 1.注意form标签的书写,form标签必须有method属性和action属性 2.所有获取用户输入的表单标签(如input等)需放在<form></ ...

  8. Python3搭建Django框架浅析

    前言 Python下有许多款不同的 Web 框架.Django是重量级选手中最有代表性的一位.许多成功的网站和APP都基于Django. Django 是一个开放源代码的 Web 应用框架,由 Pyt ...

  9. 第四十四章 Django ORM

    1. 路由系统   def test():    pass   url(r'^test$', test)      创建app:    python3 mange.py startapp app名称 ...

最新文章

  1. jenkins pipeline脚本_Jenkins 创建流水线 (Pipeline) 项目的脚本
  2. 玩爬虫不会登陆?这个工具拿走不谢!
  3. c语言e怎么表示_C语言程序设计(山东联盟)
  4. 学习的本质在于触发了你的思考
  5. web前端培训分享Electron之IPC 通信
  6. 4.Linux的目录结构
  7. python变量的作用_python 变量的作用范围
  8. 软件定义的数据中心已经来临
  9. python基础实例 韦玮 pdf_Python基础实例教程(微课版)
  10. VC/VS开发问题集锦
  11. JQuery DataTable的配置项及事件
  12. More Photos
  13. 【CNN】多角度理解CNN
  14. 最少钱币数-1-贪心算法(错,或者叫有问题)-CCF-CSP练习题(50)
  15. 信号系统与数字信号处理一点点心得
  16. PS(Photo Shop Cs6)批量调整图片大小
  17. FLV视频合并-JAVA代码
  18. 红轴和茶轴哪个声音大 红轴和茶轴哪个适合打字
  19. java 通话录音_Java 实现麦克风自动录音
  20. zookeeper leader和learner的数据同步

热门文章

  1. 函数调用的汇编语言详解
  2. 开课吧Java课堂:如何通过接口引用实现接口?
  3. canvas学习笔记-贝塞尔曲线
  4. 如何区分广播风暴和网络环路?
  5. man statd(rpc.statd中文手册)
  6. 《信息存储与管理(第二版):数字信息的存储、管理和保护》—— 2.6 磁盘驱动部件...
  7. Linux 操作系统启动流程以及trouble shooting
  8. 炒冷饭系列:设计模式 单例模式
  9. 获得执行计划方法三-sql_trace
  10. 【编译打包】drbd 8.4.2