超实用的 Python 技巧,异步操作数据库!
Python
目前已经进化到了3.8版本,对操作数据库也提供了相应的异步支持。当我们做一个Web
服务时,性能的瓶颈绝大部分都在数据库上,如果一个请求从数据库中读数据的时候能够自动切换、去处理其它请求的话,是不是就能提高并发量了呢。
(编者注:原文写于2020年2月,当时最新为Python3.8
,文章内容现在仍未过时)
下面我们来看看如何使用Python
异步操作MySQL
、PostgreSQL
以及Redis
,以上几个可以说是最常用的数据库了。至于SQLServer
、Oracle
,本人没有找到相应的异步驱动,有兴趣可以自己去探索一下。
而操作数据库无非就是增删改查,下面我们来看看如何异步实现它们。
异步操作MySQL
异步操作 MySQL
的话,需要使用一个aiomysql
,直接 pip install aiomysql
即可。
aiomysql
底层依赖于pymysql
,所以aiomysql
并没有单独实现相应的连接驱动,而是在pymysql
之上进行了封装。
查询记录
下面先来看看如何查询记录。
import asyncio
import aiomysql.sa as aio_saasync def main():# 创建一个异步引擎engine = await aio_sa.create_engine(host="xx.xxx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10)# 通过 engine.acquire() 获取一个连接async with engine.acquire() as conn:# 异步执行, 返回一个 <class 'aiomysql.sa.result.ResultProxy'> 对象result = await conn.execute("SELECT * FROM girl")# 通过 await result.fetchone() 可以获取满足条件的第一条记录, 一个 <class 'aiomysql.sa.result.RowProxy'> 对象data = await result.fetchone()# 可以将 <class 'aiomysql.sa.result.RowProxy'> 对象想象成一个字典print(data.keys()) # KeysView((1, '古明地觉', 16, '地灵殿'))print(list(data.keys())) # ['id', 'name', 'age', 'place']print(data.values()) # ValuesView((1, '古明地觉', 16, '地灵殿'))print(list(data.values())) # [1, '古明地觉', 16, '地灵殿']print(data.items()) # ItemsView((1, '古明地觉', 16, '地灵殿'))print(list(data.items())) # [('id', 1), ('name', '古明地觉'), ('age', 16), ('place', '地灵殿')]# 直接转成字典也是可以的print(dict(data)) # {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}# 最后别忘记关闭引擎, 当然你在创建引擎的时候也可以通过 async with aio_sa.create_engine 的方式创建# async with 语句结束后会自动执行下面两行代码engine.close()await engine.wait_closed()loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
怎么样,是不是很简单呢,和同步库的操作方式其实是类似的。但是很明显,我们在获取记录的时候不会只获取一条,而是会获取多条,获取多条的话使用 await result.fetchall()
即可。
import asyncio
from pprint import pprint
import aiomysql.sa as aio_saasync def main():# 通过异步上下文管理器的方式创建, 会自动帮我们关闭引擎async with aio_sa.create_engine(host="xx.xxx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:result = await conn.execute("SELECT * FROM girl")# 此时的 data 是一个列表, 列表里面是 <class 'aiomysql.sa.result.RowProxy'> 对象data = await result.fetchall()# 将里面的元素转成字典pprint(list(map(dict, data)))"""[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},{'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'},{'age': 400, 'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'}]"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
除了 fetchone
、fetchall
之外,还有一个fetchmany
,可以获取指定记录的条数。
import asyncio
from pprint import pprint
import aiomysql.sa as aio_saasync def main():# 通过异步上下文管理器的方式创建, 会自动帮我们关闭引擎async with aio_sa.create_engine(host="xx.xxx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:result = await conn.execute("SELECT * FROM girl")# 默认是获取一条, 得到的仍然是一个列表data = await result.fetchmany(2)pprint(list(map(dict, data)))"""[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},{'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'}]"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
以上就是通过aiomysql
查询数据库中的记录,没什么难度。但是值得一提的是,await conn.execute
里面除了可以传递一个原生的SQL
语句之外,我们还可以借助SQLAlchemy
。
import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy.sql.selectable import Select
from sqlalchemy import textasync def main():async with aio_sa.create_engine(host="xx.xxx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))result = await conn.execute(sql)data = await result.fetchall()pprint(list(map(dict, data)))"""[{'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'},{'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'}]"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
添加记录
然后是添加记录,我们同样可以借助SQLAlchemy
帮助我们拼接SQL
语句。
import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engineasync def main():async with aio_sa.create_engine(host="xx.xx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:# 我们还需要创建一个 SQLAlchemy 中的引擎, 然后将表反射出来s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")tbl = Table("girl", MetaData(bind=s_engine), autoload=True)insert_sql = tbl.insert().values([{"name": "十六夜咲夜", "age": 17, "place": "红魔馆"},{"name": "琪露诺", "age": 60, "place": "雾之湖"}])# 注意: 执行的执行必须开启一个事务, 否则数据是不会进入到数据库中的async with conn.begin():# 同样会返回一个 <class 'aiomysql.sa.result.ResultProxy'> 对象# 尽管我们插入了多条, 但只会返回最后一条的插入信息result = await conn.execute(insert_sql)# 返回最后一条记录的自增 idprint(result.lastrowid)# 影响的行数print(result.rowcount)# 重新查询, 看看记录是否进入到数据库中async with engine.acquire() as conn:data = await (await conn.execute("select * from girl")).fetchall()data = list(map(dict, data))pprint(data)"""[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},{'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'},{'age': 400, 'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'},{'age': 17, 'id': 16, 'name': '十六夜咲夜', 'place': '红魔馆'},{'age': 60, 'id': 17, 'name': '琪露诺', 'place': '雾之湖'}]"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
还是很方便的,但是插入多条记录的话只会返回插入的最后一条记录的信息,所以如果你希望获取每一条的信息,那么就一条一条插入。
修改记录
修改记录和添加记录是类似的,我们来看一下。
import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, textasync def main():async with aio_sa.create_engine(host="xx.xx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")tbl = Table("girl", MetaData(bind=s_engine), autoload=True)update_sql = tbl.update().where(text("name = '古明地觉'")).values({"place": "东方地灵殿"})# 同样需要开启一个事务async with conn.begin():result = await conn.execute(update_sql)print(result.lastrowid) # 0print(result.rowcount) # 1# 查询结果async with engine.acquire() as conn:data = await (await conn.execute("select * from girl where name = '古明地觉'")).fetchall()data = list(map(dict, data))pprint(data)"""[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '东方地灵殿'}]"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
可以看到,记录被成功的修改了。
删除记录
删除记录就更简单了,直接看代码。
import asyncio
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, textasync def main():async with aio_sa.create_engine(host="xx.xx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")tbl = Table("girl", MetaData(bind=s_engine), autoload=True)update_sql = tbl.delete() # 全部删除# 同样需要开启一个事务async with conn.begin():result = await conn.execute(update_sql)# 返回最后一条记录的自增 id, 我们之前修改了 id = 0 记录, 所以它跑到最后了print(result.lastrowid) # 0# 受影响的行数print(result.rowcount) # 6loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
此时数据库中的记录已经全部被删除了。
整体来看还是比较简单的,并且支持的功能也比较全面。
异步操作PostgreSQL
异步操作PostgreSQL
的话,我们有两个选择,一个是asyncpg
库,另一个是 aiopg
库。
asyncpg
是自己实现了一套连接驱动,而aiopg
则是对psycopg2
进行了封装,个人更推荐asyncpg
,性能和活跃度都比aiopg
要好。
下面来看看如何使用asyncpg
,首先是安装,直接pip install asyncpg
即可。
查询记录
首先是查询记录
import asyncio
from pprint import pprint
import asyncpgasync def main():# 创建连接数据库的驱动conn = await asyncpg.connect(host="localhost",port=5432,user="postgres",password="zgghyys123",database="postgres",timeout=10)# 除了上面的方式,还可以使用类似于 SQLAlchemy 的方式创建# await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")# 调用 await conn.fetchrow 执行 select 语句,获取满足条件的单条记录# 调用 await conn.fetch 执行 select 语句,获取满足条件的全部记录row1 = await conn.fetchrow("select * from girl")row2 = await conn.fetch("select * from girl")# 返回的是一个 Record 对象,这个 Record 对象等于将返回的记录进行了一个封装# 至于怎么用后面会说print(row1) # <Record id=1 name='古明地觉' age=16 place='地灵殿'>pprint(row2)"""[<Record id=1 name='古明地觉' age=16 place='地灵殿'>,<Record id=2 name='椎名真白' age=16 place='樱花庄'>,<Record id=3 name='古明地恋' age=15 place='地灵殿'>]"""# 关闭连接await conn.close()loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
以上我们演示了如何使用asyncpg
来获取数据库中的记录,我们看到执行select
语句的话,我们可以使用conn.fetchrow(query)
来获取满足条件的单条记录,conn.fetch(query)
来获取满足条件的所有记录。
Record 对象
我们说使用conn.fetchone
查询得到的是一个Record
对象,使用conn.fetch
查询得到的是多个Record
对象组成的列表,那么这个Rcord
对象怎么用呢?
import asyncio
import asyncpgasync def main():conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")row = await conn.fetchrow("select * from girl")print(type(row)) # <class 'asyncpg.Record'>print(row) # <Record id=1 name='古明地觉' age=16 place='地灵殿'># 这个 Record 对象可以想象成一个字典# 我们可以将返回的字段名作为 key, 通过字典的方式进行获取print(row["id"], row["name"]) # 1 古明地觉# 除此之外,还可以通过 get 获取,获取不到的时候会返回默认值print(row.get("id"), row.get("name")) # 1 古明地觉print(row.get("xxx"), row.get("xxx", "不存在的字段")) # None 不存在的字段# 除此之外还可以调用 keys、values、items,这个不用我说,都应该知道意味着什么# 只不过返回的是一个迭代器print(row.keys()) # <tuple_iterator object at 0x000001D6FFDAE610>print(row.values()) # <tuple_iterator object at 0x000001D6FFDAE610>print(row.items()) # <RecordItemsIterator object at 0x000001D6FFDF20C0># 我们需要转成列表或者元组print(list(row.keys())) # ['id', 'name', 'age', 'place']print(list(row.values())) # [1, '古明地觉', 16, '地灵殿']print(dict(row.items())) # {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}print(dict(row)) # {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}# 关闭连接await conn.close()if __name__ == '__main__':asyncio.run(main())
当然我们也可以借助SQLAlchemy
帮我们拼接SQL
语句。
import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy.sql.selectable import Select
from sqlalchemy import textasync def main():conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))# 我们不能直接传递一个 Select 对象, 而是需要将其转成原生的字符串才可以rows = await conn.fetch(str(sql))pprint(list(map(dict, rows))) """[{'id': 2, 'name': '椎名真白', 'place': '樱花庄'},{'id': 3, 'name': '古明地恋', 'place': '地灵殿'}]"""# 关闭连接await conn.close()if __name__ == '__main__':asyncio.run(main())
此外,conn.fetch
里面还支持占位符,使用百分号加数字的方式,举个例子:
import asyncio
from pprint import pprint
import asyncpgasync def main():conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")rows = await conn.fetch("select * from girl where id != $1", 1)pprint(list(map(dict, rows)))"""[{'age': 16, 'id': 2, 'name': '椎名真白', 'place': '樱花庄'},{'age': 15, 'id': 3, 'name': '古明地恋', 'place': '地灵殿'}]"""# 关闭连接await conn.close()if __name__ == '__main__':asyncio.run(main())
还是推荐使用SQLAlchemy
的方式,这样更加方便一些,就像aiomysql
一样。但是对于asyncpg
而言,实际上接收的是一个原生的SQL
语句,是一个字符串,因此它不能像aiomysql
一样自动识别Select
对象,我们还需要手动将其转成字符串。而且这样还存在一个问题,至于是什么我们下面介绍添加记录的时候说。
添加记录
然后是添加记录,我们看看如何往库里面添加数据。
import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy.sql.selectable import Select
from sqlalchemy import textasync def main():conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")# 执行 insert 语句我们可以使用 executerow = await conn.execute("insert into girl(name, age, place) values ($1, $2, $3)",'十六夜咲夜', 17, '红魔馆')pprint(row) # INSERT 0 1pprint(type(row)) # <class 'str'>await conn.close()if __name__ == '__main__':asyncio.run(main())
通过execute
可以插入单条记录,同时返回相关信息,但是说实话这个信息没什么太大用。除了execute
之外,还有executemany
,用来执行多条插入语句。
import asyncio
import asyncpgasync def main():conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")# executemany:第一条参数是一个模板,第二条命令是包含多个元组的列表# 执行多条记录的话,返回的结果为 Nonerows = await conn.executemany("insert into girl(name, age, place) values ($1, $2, $3)",[('十六夜咲夜', 17, '红魔馆'), ('琪露诺', 60, '雾之湖')])print(rows) # None# 关闭连接await conn.close()if __name__ == '__main__':asyncio.run(main())
注意:如果是执行大量insert
语句的话,那么executemany
要比 execute
快很多,但是executemany
不具备事务功。
NO.1
往期推荐
Historical articles
介绍5个常用的Python库,赶紧收藏!!
20条优化 SQL 的宝贵建议,建议收藏!
系统性总结了 Numpy 的所有关键知识点,建议收藏!!
总结了25个Pandas Groupby 经典案例!!
分享、收藏、点赞、在看安排一下?
超实用的 Python 技巧,异步操作数据库!相关推荐
- 超实用的python技巧:python读写Excel表格的实例代码(简单实用)
@本文来源于公众号:csdn2299,喜欢可以关注公众号 程序员学府 这篇文章主要介绍了python读写Excel表格的方法,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可 ...
- python多线程读取数据库数据_Python基于多线程操作数据库相关知识点详解
Python基于多线程操作数据库相关问题分析 本文实例分析了Python多线程操作数据库相关问题.分享给大家供大家参考,具体如下: python多线程并发操作数据库,会存在链接数据库超时.数据库连接丢 ...
- python生成uuid_咸鱼高赞回答:有什么相见恨晚的Python技巧,附赠python最新教程...
日常工作几乎离不python.一路走来,他积累了不少有用的技巧和tips,现在就将这些技巧分享给大家.这些技巧将根据其首字母按A-Z的顺序进行展示. Python相关学习资料获取方式:转发文章+关注私 ...
- python初学者_面向初学者的20种重要的Python技巧
python初学者 Python is among the most widely used market programming languages in the world. This is be ...
- 超实用的脚本——检查oracle数据库是否存在潜伏的比特币勒索病毒
转载来源 : 超实用的脚本--检查oracle数据库是否存在潜伏的比特币勒索病毒 : http://www.safebase.cn/article-254989-1.html 概述 分享一个工作中经常 ...
- python建立sqlite数据库_python sqlite3 创建数据库
Python标准库14 数据库 (sqlite3) 作者:Vamei 出处:http://www.cnblogs.com/vamei 欢迎转载,也请保留这段声明.谢谢! Python自带一个轻量级的关 ...
- python操作mysql数据库实现增删改查
Python 标准数据库接口为 Python DB-API,Python DB-API为开发人员提供了数据库应用编程接口. Python 数据库接口支持非常多的数据库,你可以选择适合你项目的数据库: ...
- python如何编写数据库_如何在几分钟内用Python编写一个简单的玩具数据库
python如何编写数据库 MySQL, PostgreSQL, Oracle, Redis, and many more, you just name it - databases are a re ...
- Python 操作 MongoDB 数据库!
作者 |黄伟呢 来源 |数据分析与统计学之美 MongoDB是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的. 先来看看MySQL与MongoDB 概念区别 ...
- Python与MySQL数据库的交互实战
作者 | Huang supreme 编辑 | 郭芮 图源 | 视觉中国 安装PyMySQL库 如果你想要使用python操作MySQL数据库,就必须先要安装pymysql库,这个库的安装很简单,直接 ...
最新文章
- c语言rank需要头文件吗,C++ std::rank用法及代码示例
- 在Visual C++ 中使用内联汇编
- python 字符串去重从小到大排列_110道题整理(1-60)
- php配置前缀为任一个 .htacess,httpd.ini和.htaccess的写法与配置规则
- Missing URI template variable 'XXXX' for method parameter of type String
- Kubernetes 入门教程
- hibernate中对象的3种状态----瞬时态、持久态、脱管态
- VirtualBox复制虚拟机
- Node.js HTTP
- Docker入门学习四之自己制作Docker镜像
- ANSYS19.0安装(无比详细的图文示范教程)
- SwitchHost提示管理员身份运行,不能修改hosts文件
- XML学习总结(三)——SAXReader解析xml文件数据
- 【转载】完全二叉树的高度为什么是对lgN向下取整
- haosou属于搜索引擎的_搜索引擎登录工具
- 普迪文集团:马来西亚留学必须了解的7个真相
- 路由器和交换机的工作原理---笔面试
- 在header中添加中文头信息
- photoshop cc 2018中文
- text style