Python 目前已经进化到了3.8版本,对操作数据库也提供了相应的异步支持。当我们做一个Web服务时,性能的瓶颈绝大部分都在数据库上,如果一个请求从数据库中读数据的时候能够自动切换、去处理其它请求的话,是不是就能提高并发量了呢。

(编者注:原文写于2020年2月,当时最新为Python3.8,文章内容现在仍未过时)

下面我们来看看如何使用Python异步操作MySQLPostgreSQL以及Redis,以上几个可以说是最常用的数据库了。至于SQLServerOracle,本人没有找到相应的异步驱动,有兴趣可以自己去探索一下。

而操作数据库无非就是增删改查,下面我们来看看如何异步实现它们。

异步操作MySQL

异步操作 MySQL 的话,需要使用一个aiomysql,直接 pip install aiomysql 即可。

aiomysql底层依赖于pymysql,所以aiomysql并没有单独实现相应的连接驱动,而是在pymysql之上进行了封装。

查询记录

下面先来看看如何查询记录。

import asyncio
import aiomysql.sa as aio_saasync def main():# 创建一个异步引擎engine = await aio_sa.create_engine(host="xx.xxx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10)# 通过 engine.acquire() 获取一个连接async with engine.acquire() as conn:# 异步执行, 返回一个 <class 'aiomysql.sa.result.ResultProxy'> 对象result = await conn.execute("SELECT * FROM girl")# 通过 await result.fetchone() 可以获取满足条件的第一条记录, 一个 <class 'aiomysql.sa.result.RowProxy'> 对象data = await result.fetchone()# 可以将 <class 'aiomysql.sa.result.RowProxy'> 对象想象成一个字典print(data.keys())  # KeysView((1, '古明地觉', 16, '地灵殿'))print(list(data.keys()))  # ['id', 'name', 'age', 'place']print(data.values())  # ValuesView((1, '古明地觉', 16, '地灵殿'))print(list(data.values()))  # [1, '古明地觉', 16, '地灵殿']print(data.items())  # ItemsView((1, '古明地觉', 16, '地灵殿'))print(list(data.items()))  # [('id', 1), ('name', '古明地觉'), ('age', 16), ('place', '地灵殿')]# 直接转成字典也是可以的print(dict(data))  # {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}# 最后别忘记关闭引擎, 当然你在创建引擎的时候也可以通过 async with aio_sa.create_engine 的方式创建# async with 语句结束后会自动执行下面两行代码engine.close()await engine.wait_closed()loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

怎么样,是不是很简单呢,和同步库的操作方式其实是类似的。但是很明显,我们在获取记录的时候不会只获取一条,而是会获取多条,获取多条的话使用 await result.fetchall() 即可。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_saasync def main():# 通过异步上下文管理器的方式创建, 会自动帮我们关闭引擎async with aio_sa.create_engine(host="xx.xxx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:result = await conn.execute("SELECT * FROM girl")# 此时的 data 是一个列表, 列表里面是 <class 'aiomysql.sa.result.RowProxy'> 对象data = await result.fetchall()# 将里面的元素转成字典pprint(list(map(dict, data)))"""[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},{'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'},{'age': 400, 'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'}]"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

除了 fetchonefetchall之外,还有一个fetchmany,可以获取指定记录的条数。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_saasync def main():# 通过异步上下文管理器的方式创建, 会自动帮我们关闭引擎async with aio_sa.create_engine(host="xx.xxx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:result = await conn.execute("SELECT * FROM girl")# 默认是获取一条, 得到的仍然是一个列表data = await result.fetchmany(2)pprint(list(map(dict, data)))"""[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},{'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'}]"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

以上就是通过aiomysql查询数据库中的记录,没什么难度。但是值得一提的是,await conn.execute里面除了可以传递一个原生的SQL语句之外,我们还可以借助SQLAlchemy

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy.sql.selectable import Select
from sqlalchemy import textasync def main():async with aio_sa.create_engine(host="xx.xxx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))result = await conn.execute(sql)data = await result.fetchall()pprint(list(map(dict, data)))"""[{'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'},{'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'}]"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

添加记录

然后是添加记录,我们同样可以借助SQLAlchemy帮助我们拼接SQL语句。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engineasync def main():async with aio_sa.create_engine(host="xx.xx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:# 我们还需要创建一个 SQLAlchemy 中的引擎, 然后将表反射出来s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")tbl = Table("girl", MetaData(bind=s_engine), autoload=True)insert_sql = tbl.insert().values([{"name": "十六夜咲夜", "age": 17, "place": "红魔馆"},{"name": "琪露诺", "age": 60, "place": "雾之湖"}])# 注意: 执行的执行必须开启一个事务, 否则数据是不会进入到数据库中的async with conn.begin():# 同样会返回一个 <class 'aiomysql.sa.result.ResultProxy'> 对象# 尽管我们插入了多条, 但只会返回最后一条的插入信息result = await conn.execute(insert_sql)# 返回最后一条记录的自增 idprint(result.lastrowid)# 影响的行数print(result.rowcount)# 重新查询, 看看记录是否进入到数据库中async with engine.acquire() as conn:data = await (await conn.execute("select * from girl")).fetchall()data = list(map(dict, data))pprint(data)"""[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},{'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法森林'},{'age': 400, 'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'},{'age': 17, 'id': 16, 'name': '十六夜咲夜', 'place': '红魔馆'},{'age': 60, 'id': 17, 'name': '琪露诺', 'place': '雾之湖'}]"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

还是很方便的,但是插入多条记录的话只会返回插入的最后一条记录的信息,所以如果你希望获取每一条的信息,那么就一条一条插入。

修改记录

修改记录和添加记录是类似的,我们来看一下。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, textasync def main():async with aio_sa.create_engine(host="xx.xx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")tbl = Table("girl", MetaData(bind=s_engine), autoload=True)update_sql = tbl.update().where(text("name = '古明地觉'")).values({"place": "东方地灵殿"})# 同样需要开启一个事务async with conn.begin():result = await conn.execute(update_sql)print(result.lastrowid)  # 0print(result.rowcount)   # 1# 查询结果async with engine.acquire() as conn:data = await (await conn.execute("select * from girl where name = '古明地觉'")).fetchall()data = list(map(dict, data))pprint(data)"""[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '东方地灵殿'}]"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

可以看到,记录被成功的修改了。

删除记录

删除记录就更简单了,直接看代码。

import asyncio
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, textasync def main():async with aio_sa.create_engine(host="xx.xx.xx.xxx",port=3306,user="root",password="root",db="_hanser",connect_timeout=10) as engine:async with engine.acquire() as conn:s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")tbl = Table("girl", MetaData(bind=s_engine), autoload=True)update_sql = tbl.delete()  # 全部删除# 同样需要开启一个事务async with conn.begin():result = await conn.execute(update_sql)# 返回最后一条记录的自增 id, 我们之前修改了 id = 0 记录, 所以它跑到最后了print(result.lastrowid)  # 0# 受影响的行数print(result.rowcount)   # 6loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

此时数据库中的记录已经全部被删除了。

整体来看还是比较简单的,并且支持的功能也比较全面。

异步操作PostgreSQL

异步操作PostgreSQL的话,我们有两个选择,一个是asyncpg库,另一个是 aiopg库。

asyncpg是自己实现了一套连接驱动,而aiopg则是对psycopg2进行了封装,个人更推荐asyncpg,性能和活跃度都比aiopg要好。

下面来看看如何使用asyncpg,首先是安装,直接pip install asyncpg 即可。

查询记录

首先是查询记录

import asyncio
from pprint import pprint
import asyncpgasync def main():# 创建连接数据库的驱动conn = await asyncpg.connect(host="localhost",port=5432,user="postgres",password="zgghyys123",database="postgres",timeout=10)# 除了上面的方式,还可以使用类似于 SQLAlchemy 的方式创建# await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")# 调用 await conn.fetchrow 执行 select 语句,获取满足条件的单条记录# 调用 await conn.fetch 执行 select 语句,获取满足条件的全部记录row1 = await conn.fetchrow("select * from girl")row2 = await conn.fetch("select * from girl")# 返回的是一个 Record 对象,这个 Record 对象等于将返回的记录进行了一个封装# 至于怎么用后面会说print(row1)  # <Record id=1 name='古明地觉' age=16 place='地灵殿'>pprint(row2)"""[<Record id=1 name='古明地觉' age=16 place='地灵殿'>,<Record id=2 name='椎名真白' age=16 place='樱花庄'>,<Record id=3 name='古明地恋' age=15 place='地灵殿'>]"""# 关闭连接await conn.close()loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

以上我们演示了如何使用asyncpg 来获取数据库中的记录,我们看到执行select语句的话,我们可以使用conn.fetchrow(query) 来获取满足条件的单条记录,conn.fetch(query)来获取满足条件的所有记录。

Record 对象

我们说使用conn.fetchone查询得到的是一个Record对象,使用conn.fetch查询得到的是多个Record对象组成的列表,那么这个Rcord对象怎么用呢?

import asyncio
import asyncpgasync def main():conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")row = await conn.fetchrow("select * from girl")print(type(row))  # <class 'asyncpg.Record'>print(row)  # <Record id=1 name='古明地觉' age=16 place='地灵殿'># 这个 Record 对象可以想象成一个字典# 我们可以将返回的字段名作为 key, 通过字典的方式进行获取print(row["id"], row["name"])  # 1 古明地觉# 除此之外,还可以通过 get 获取,获取不到的时候会返回默认值print(row.get("id"), row.get("name"))  # 1 古明地觉print(row.get("xxx"), row.get("xxx", "不存在的字段"))  # None 不存在的字段# 除此之外还可以调用 keys、values、items,这个不用我说,都应该知道意味着什么# 只不过返回的是一个迭代器print(row.keys())  # <tuple_iterator object at 0x000001D6FFDAE610>print(row.values())  # <tuple_iterator object at 0x000001D6FFDAE610>print(row.items())  # <RecordItemsIterator object at 0x000001D6FFDF20C0># 我们需要转成列表或者元组print(list(row.keys()))  # ['id', 'name', 'age', 'place']print(list(row.values()))  # [1, '古明地觉', 16, '地灵殿']print(dict(row.items()))  # {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}print(dict(row))  # {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}# 关闭连接await conn.close()if __name__ == '__main__':asyncio.run(main())

当然我们也可以借助SQLAlchemy帮我们拼接SQL语句。

import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy.sql.selectable import Select
from sqlalchemy import textasync def main():conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))# 我们不能直接传递一个 Select 对象, 而是需要将其转成原生的字符串才可以rows = await conn.fetch(str(sql))pprint(list(map(dict, rows)))  """[{'id': 2, 'name': '椎名真白', 'place': '樱花庄'},{'id': 3, 'name': '古明地恋', 'place': '地灵殿'}]"""# 关闭连接await conn.close()if __name__ == '__main__':asyncio.run(main())

此外,conn.fetch里面还支持占位符,使用百分号加数字的方式,举个例子:

import asyncio
from pprint import pprint
import asyncpgasync def main():conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")rows = await conn.fetch("select * from girl where id != $1", 1)pprint(list(map(dict, rows)))"""[{'age': 16, 'id': 2, 'name': '椎名真白', 'place': '樱花庄'},{'age': 15, 'id': 3, 'name': '古明地恋', 'place': '地灵殿'}]"""# 关闭连接await conn.close()if __name__ == '__main__':asyncio.run(main())

还是推荐使用SQLAlchemy的方式,这样更加方便一些,就像aiomysql 一样。但是对于asyncpg而言,实际上接收的是一个原生的SQL 语句,是一个字符串,因此它不能像aiomysql一样自动识别Select对象,我们还需要手动将其转成字符串。而且这样还存在一个问题,至于是什么我们下面介绍添加记录的时候说。

添加记录

然后是添加记录,我们看看如何往库里面添加数据。

import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy.sql.selectable import Select
from sqlalchemy import textasync def main():conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")# 执行 insert 语句我们可以使用 executerow = await conn.execute("insert into girl(name, age, place) values ($1, $2, $3)",'十六夜咲夜', 17, '红魔馆')pprint(row)  # INSERT 0 1pprint(type(row))  # <class 'str'>await conn.close()if __name__ == '__main__':asyncio.run(main())

通过execute可以插入单条记录,同时返回相关信息,但是说实话这个信息没什么太大用。除了execute之外,还有executemany,用来执行多条插入语句。

import asyncio
import asyncpgasync def main():conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")# executemany:第一条参数是一个模板,第二条命令是包含多个元组的列表# 执行多条记录的话,返回的结果为 Nonerows = await conn.executemany("insert into girl(name, age, place) values ($1, $2, $3)",[('十六夜咲夜', 17, '红魔馆'), ('琪露诺', 60, '雾之湖')])print(rows)  # None# 关闭连接await conn.close()if __name__ == '__main__':asyncio.run(main())

注意:如果是执行大量insert语句的话,那么executemany要比 execute 快很多,但是executemany不具备事务功。

NO.1

往期推荐

Historical articles

介绍5个常用的Python库,赶紧收藏!!

20条优化 SQL 的宝贵建议,建议收藏!

系统性总结了 Numpy 的所有关键知识点,建议收藏!!

总结了25个Pandas Groupby 经典案例!!

分享、收藏、点赞、在看安排一下?

超实用的 Python 技巧,异步操作数据库!相关推荐

  1. 超实用的python技巧:python读写Excel表格的实例代码(简单实用)

    @本文来源于公众号:csdn2299,喜欢可以关注公众号 程序员学府 这篇文章主要介绍了python读写Excel表格的方法,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可 ...

  2. python多线程读取数据库数据_Python基于多线程操作数据库相关知识点详解

    Python基于多线程操作数据库相关问题分析 本文实例分析了Python多线程操作数据库相关问题.分享给大家供大家参考,具体如下: python多线程并发操作数据库,会存在链接数据库超时.数据库连接丢 ...

  3. python生成uuid_咸鱼高赞回答:有什么相见恨晚的Python技巧,附赠python最新教程...

    日常工作几乎离不python.一路走来,他积累了不少有用的技巧和tips,现在就将这些技巧分享给大家.这些技巧将根据其首字母按A-Z的顺序进行展示. Python相关学习资料获取方式:转发文章+关注私 ...

  4. python初学者_面向初学者的20种重要的Python技巧

    python初学者 Python is among the most widely used market programming languages in the world. This is be ...

  5. 超实用的脚本——检查oracle数据库是否存在潜伏的比特币勒索病毒

    转载来源 : 超实用的脚本--检查oracle数据库是否存在潜伏的比特币勒索病毒 : http://www.safebase.cn/article-254989-1.html 概述 分享一个工作中经常 ...

  6. python建立sqlite数据库_python sqlite3 创建数据库

    Python标准库14 数据库 (sqlite3) 作者:Vamei 出处:http://www.cnblogs.com/vamei 欢迎转载,也请保留这段声明.谢谢! Python自带一个轻量级的关 ...

  7. python操作mysql数据库实现增删改查

    Python 标准数据库接口为 Python DB-API,Python DB-API为开发人员提供了数据库应用编程接口. Python 数据库接口支持非常多的数据库,你可以选择适合你项目的数据库: ...

  8. python如何编写数据库_如何在几分钟内用Python编写一个简单的玩具数据库

    python如何编写数据库 MySQL, PostgreSQL, Oracle, Redis, and many more, you just name it - databases are a re ...

  9. Python 操作 MongoDB 数据库!

    作者 |黄伟呢 来源 |数据分析与统计学之美 MongoDB是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的. 先来看看MySQL与MongoDB 概念区别 ...

  10. Python与MySQL数据库的交互实战

    作者 | Huang supreme 编辑 | 郭芮 图源 | 视觉中国 安装PyMySQL库 如果你想要使用python操作MySQL数据库,就必须先要安装pymysql库,这个库的安装很简单,直接 ...

最新文章

  1. c语言rank需要头文件吗,C++ std::rank用法及代码示例
  2. 在Visual C++ 中使用内联汇编
  3. python 字符串去重从小到大排列_110道题整理(1-60)
  4. php配置前缀为任一个 .htacess,httpd.ini和.htaccess的写法与配置规则
  5. Missing URI template variable 'XXXX' for method parameter of type String
  6. Kubernetes 入门教程
  7. hibernate中对象的3种状态----瞬时态、持久态、脱管态
  8. VirtualBox复制虚拟机
  9. Node.js HTTP
  10. Docker入门学习四之自己制作Docker镜像
  11. ANSYS19.0安装(无比详细的图文示范教程)
  12. SwitchHost提示管理员身份运行,不能修改hosts文件
  13. XML学习总结(三)——SAXReader解析xml文件数据
  14. 【转载】完全二叉树的高度为什么是对lgN向下取整
  15. haosou属于搜索引擎的_搜索引擎登录工具
  16. 普迪文集团:马来西亚留学必须了解的7个真相
  17. 路由器和交换机的工作原理---笔面试
  18. 在header中添加中文头信息
  19. photoshop cc 2018中文
  20. text style

热门文章

  1. 如何在论文后面插参考文献
  2. Openbravo3.0 体系结构
  3. 小米手环4怎么使用_小米运动手环4使用说明
  4. 在线学习编程网站收集
  5. 三国之空城计游戏攻略
  6. 从此甩掉光驱 U盘安装系统最详攻略
  7. 杭州用城市大脑治堵4年,怎么限牌还玩升级?
  8. vue获取内外网ip地址
  9. 什么是搜索引擎关键词?搜索引擎关键词优化
  10. Redis 面试常问问题