SQLAlchemy Async
目录
架构
安装:
Engine:
Result:
Session:
MetaData:
Table And ORM:
反射表:
插入数据:
查询数据:
更新和删除:
使用流式:
Async ORM:
关联对象加载:
Event 钩子:
SQL 编译缓存:
Alembic 数据库迁移:
架构
版本要求:Cpython +3.7
安装:
pip install SQLAlchemy 安装对应数据库的DBAPI: https://docs.sqlalchemy.org/en/20/dialects/mysql.html (我使用了aiomysql,databases要求sqlalchemy<1.4,所以不适用) # 查看版本 >>> import sqlalchemy >>> sqlalchemy.__version__
Engine:
# 创建enginefrom sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionfrom sqlalchemy.orm import declarative_base, sessionmakerfrom sqlalchemy import text#参数 echo:打印执行日志,future:使用2.0新特性,也可以使用async_engine_from_config创建,engine直到第一次请求数据库才会真正连接到数据库,称为延迟初始化engine = create_async_engine("mysql+aiomysql://127.0.0.1:3307/test", echo=True, future=True)Session = sessionmaker(class_=AsyncSession,autocommit=False,autoflush=False,bind=engine)Base = declarative_base()#等价于 from sqlalchemy.orm import registry# mapper_registry = registry()# Base = mapper_registry.generate_base()# 简单使用async def connect_query():async with engine.connect() as conn:result = await conn.execute(text("SELECT x, y FROM some_table"))for row in result:print(f"x: {row.x} y: {row.y}")# 手动提交: 使用冒号声明参数async def connect_insert():async with engine.connect() as conn:await conn.execute(text("CREATE TABLE some_table (x int, y int)"))await conn.execute(text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),{"x": 1, "y": 1}, {"x": 2, "y": 4})await conn.commit() # 自动提交:绑定多个参数async def connect_insert():async with engine.begin() as conn:await conn.execute(text("CREATE TABLE some_table (x int, y int)"))await conn.execute(text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),[{"x": 1, "y": 1}, {"x": 2, "y": 4}])await conn.commit()# 使用bindparams绑定参数async def connect_query():stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x,y").bindparams(y=6)async with engine.connect() as conn:result = await conn.execute(stmt)for row in result:print(f"x: {row.x} y: {row.y}")# Engine.connect()从连接池中返回一个Connection对象,连接对象会被回收,而不是真正的关闭# connection.execute返回一个CursorResult对象,类似游标,CursorResult在所有结果行(如果有)用尽时关闭。特别的如UPDATE 语句(没有任何返回的行),在执行后立即释放游标资源
Result:
1. 每一条数据都是一个Row(类似具名元组),因此可以访问属性,索引2. 返回字典对象:for dict_row in result.mappings():x = dict_row['x']y = dict_row['y']3. 获取所有结果: all()等价fetchall() → List[sqlalchemy.engine.row.Row]4. 获取特定列:columns(*col_expressions: Union[str, Column[Any], int])5. 获取指定条数:fetchmany(size: Optional[int] = None) → List[sqlalchemy.engine.row.Row]6. 获取单行:fetchone() → Optional[sqlalchemy.engine.row.Row]Result.first() → Optional[sqlalchemy.engine.row.Row] (丢弃其他数据)7. 重复读取结果Result.freeze() → sqlalchemy.engine.FrozenResult8. 读取列名称Result.keys() → sqlalchemy.engine.RMKeyView9. 返回字典格式数据mappings() → sqlalchemy.engine.MappingResult10. 结果合并merge(*others: sqlalchemy.engine.result.Result) → sqlalchemy.engine.MergedResult11. 返回一条数据,不存在或多个时抛出异常one() → sqlalchemy.engine.Row12. 返回一条数据或None one_or_none() → Optional[sqlalchemy.engine.row.Row13. 分块返回,(但后台依然一次性缓存了所有数据)partitions(size: Optional[int] = one)Iterator[List[sqlalchemy.engine.row.Row]]14. 返回第一行的第一列值,如果没有返回None: scalar() → Any15. 返回第一行,不存在或多个则报错:scalar_one() → Any16. 返回第一条或者None:scalar_one_or_none() → Optional[Any]17. 返回所有数据的某一列值(默认返回第一列):scalars(index: Union[str, Column[Any], int] = 0)18. 批量返回:yield_per(num: int) → SelfResultsqlalchemy.engine.ScalarResult sqlalchemy.engine.MappingResultsqlalchemy.engine.CursorResult都类似,编码过程中注意看result的类型,查看具体方法:https://docs.sqlalchemy.org/en/20/core/connections.html#sqlalchemy.engine.Result
Session:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
engine = create_async_engine("mysql+aiomysql://127.0.0.1:3307/test", echo=True, future=True)
Session = sessionmaker(class_=AsyncSession,autocommit=False,autoflush=False,bind=engine)
Base = declarative_base()
async def session_query():async with Session() as session:stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)result = await session.execute(stmt)for row in result:print(f"x: {row.x} y: {row.y}")async def session_update():async with Session() as session:result = await session.execute( text("UPDATE some_table SET y=:y WHERE x=:x"),[{"x": 9, "y":11}, {"x": 13, "y": 15}])await session.commit()# result 为CursorResult对象,参数绑定方式与connect相同
MetaData:
# metadata 保存所有Table 和 ORM对象
from sqlalchemy import Table, Column, Integer, String
from sqlalchemy import MetaData
metadata_obj = MetaData()
user_table = Table("user_account",metadata_obj,Column('id', Integer, primary_key=True),Column('name', String(30)),Column('fullname', String))
>>> user_table.c.nameColumn('name', String(length=30), table=<user_account>)
>>> user_table.c.keys()
['id', 'name', 'fullname']
# 生成表到数据库
metadata_obj.create_all(engine)
# 删除所有表
metadata_obj.drop_all()
Table And ORM:
from sqlalchemy.orm import registry,eclarative_basefrom sqlalchemy.orm import relationshipBase = registry().generate_base()Base = declarative_base(bind=engine) # 更简单的声明Base,推荐class User(Base):__tablename__ = 'user_account'id = Column(Integer, primary_key=True)name = Column(String(30))fullname = Column(String)# 如果使用back_populates,必须在两边对应声明addresses = relationship("Address", back_populates="user")def __repr__(self):return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"class Address(Base):__tablename__ = 'address'id = Column(Integer, primary_key=True)email_address = Column(String, nullable=False)user_id = Column(Integer, ForeignKey('user_account.id'))user = relationship("User", back_populates="addresses")def __repr__(self):return f"Address(id={self.id!r}, email_address={self.email_address!r})"# User.__table__ 就是一个Table对象,相反的你也可以先声明一个Table再使用这个Table声明一个ORM类from sqlalchemy import Table, Column, Integer, Stringfrom sqlalchemy import MetaDatametadata_obj = MetaData()user_table = Table("user_account",metadata_obj,Column('id', Integer, primary_key=True),Column('name', String(30)),Column('fullname', String))Base = declarative_base(bind=engine,metadata=metadata_obj)class User(Base):__table__ = user_tableaddresses = relationship("Address", backref="user")def __repr__(self):return f"User({self.name!r}, {self.fullname!r})"# 生成所有表Base.metadata.create_all(engine)
反射表:
# 所谓反射就是读取现有数据库的表结构,生成对应的Table对象,也支持View反射,目前只支持同步enginefrom sqlalchemy import create_enginefrom sqlalchemy import MetaDatametadata_obj = MetaData()engine = create_engine("mysql+pymysql://127.0.0.1:3307/test", echo=True, future=True)# 反射单个表some_table = Table("some_table", metadata_obj, autoload_with=engine)# 覆盖原有属性或增加新属性mytable = Table('mytable', metadata_obj,Column('id', Integer, primary_key=True), Column('mydata', Unicode(50)), autoload_with=some_engine)# 一次反射所有表metadata_obj = MetaData()metadata_obj.reflect(bind=engine)users_table = metadata_obj.tables['users']addresses_table = metadata_obj.tables['addresses']# 更底层的反射可以看 inspect# 如果我想在异步中使用反射,先使用同步engine得到反射后的metadata_obj对象,在将它传递给Base就可以了# 也可以这样import asynciofrom sqlalchemy.ext.asyncio import create_async_enginefrom sqlalchemy.ext.asyncio import AsyncSessionfrom sqlalchemy import inspectengine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test")def use_inspector(conn):inspector = inspect(conn)# use the inspectorprint(inspector.get_view_names())# return any value to the callerreturn inspector.get_table_names()async def async_main():async with engine.connect() as conn:tables = await conn.run_sync(use_inspector)
插入数据:
# 插入数据from sqlalchemy import insert# 插入单条数据async def insert_one():async with Session() as session:stmt=insert(user_table).values(name='spongebob',fullname="SpongebobSquarepants")result = await session.execute(stmt)await session.commit()# 获取插入数据的主键(注意有的数据库可能不支持,必须使用insert方法构建,直接使用text方法构建的插入不能访问该属性)result.inserted_primary_key# 插入多条数据async def insert_many():async with Session() as session:result = await session.execute(stmt=insert(user_table),[{"name": "sandy", "fullname": "Sandy Cheeks"},{"name": "patrick", "fullname": "Patrick Star"}])await session.commit()# 插入数据的复合主键result.inserted_primary_key_rows# insert from select 我没有这样的场景>>> select_stmt = select(user_table.c.id, user_table.c.name + "@aol.com")>>> insert_stmt = insert(address_table).from_select(... ["user_id", "email_address"], select_stmt... )>>> print(insert_stmt.returning(address_table.c.id, address_table.c.email_address))# 部分数据库支持返回值insert_stmt=insert(address_table).returning(address_table.c.id,address_table.c.email_address)
查询数据:
# 查询数据async def query():async with Session() as session:# stmt=select(user_table).where(user_table.c.name == 'spongebob')stmt = select(User).where(User.name == 'spongebob')result = await session.execute(stmt)data = result.all()# 排序 select(User).where(User.name == 'spongebob').order_by(User.name.desc())# 分组select(User).where(User.name=='spongebob').group_by(User.name).having(func.count(Address.id) > 1)select(Address.user_id,func.count(Address.id).label('num_addresses')).\group_by("user_id").order_by("user_id", desc("num_addresses"))# 属性别名select(("Username: " + user_table.c.name).label("username"))# 表别名 from sqlalchemy.orm import aliased address_alias_1 = aliased(Address)# where : ==、!=、>、<、 >= 、<=# and_ or_ >>> from sqlalchemy import and_, or_>>> print(... select(Address.email_address).... where(... and_(... or_(User.name == 'squidward', User.name == 'sandy'),... Address.user_id == User.id... )... )... )#filter_by 注意是一个等号stmt = select(Application).filter_by(app_id = app_id)# 查询数量select(func.count('*')).select_from(user_table)# 指定返回数量select(Application).filter_by(app_id = app_id).limit(1)# 关联查询(默认为内联)select(user_table.c.name,address_table.c.email_address).join_from(user_table,address_table)user_alias_1 = user_table.alias()user_alias_2 = user_table.alias()select(user_alias_1.c.name, user_alias_2.c.name)\.join_from(user_alias_1, user_alias_2, user_alias_1.c.id > user_alias_2.c.id)select(user_table.c.name, address_table.c.email_address).join(address_table)# 指定关联条件select(address_table.c.email_address).select_from(user_table).join(address_table, user_table.c.id == address_table.c.user_id)#外联select(user_table).join(address_table, isouter=True)#全联select(user_table).join(address_table, full=True)# 子查询subq = select(func.count(address_table.c.id).label("count"),address_table.c.user_id).group_by(address_table.c.user_id).subquery()select(subq.c.user_id, subq.c.count)# CTE临时结果集 cte()# UNION Union ALLstmt1 = select(User).where(User.name == 'sandy')stmt2 = select(User).where(User.name == 'spongebob')u = union_all(stmt1, stmt2)# 判断是否存在stmt = select(User).where(User.name == 'sandy').exists()# 使用数据库函数 func.count():https://docs.sqlalchemy.org/en/14/core/functions.html.....更多:https://docs.sqlalchemy.org/en/14/tutorial/data_select.html
更新和删除:
# 更新数据from sqlalchemy import updatestmt = (update(user_table).where(user_table.c.name == 'patrick').values(fullname='Patrick the Star'))# 查询并更新from sqlalchemy import bindparamstmt = (update(user_table).where(user_table.c.name == bindparam('oldname')).values(name=bindparam('newname')))async def update_db():async with engine.begin() as conn:conn.execute(stmt,[{'oldname':'jack', 'newname':'ed'},{'oldname':'wendy', 'newname':'mary'},{'oldname':'jim', 'newname':'jake'},])# 有序更新update_stmt = (update(some_table).ordered_values((some_table.c.y, 20),(some_table.c.x, some_table.c.y + 10)))# 删除数据from sqlalchemy import deletestmt = delete(user_table).where(user_table.c.name == 'patrick')# 获取执行结果影响行数result.rowcount# 指定返回值update_stmt = (update(user_table).where(user_table.c.name == 'patrick').values(fullname='Patrick the Star').returning(user_table.c.id, user_table.c.name)
使用流式:
# 访问大量数据时async with async_engine.connect() as conn:result = await conn.stream("select * from user")async for rows in result.partitions(size=100):for row in rows:yield row._mapping
Async ORM:
import asynciofrom sqlalchemy import Columnfrom sqlalchemy import DateTimefrom sqlalchemy import ForeignKeyfrom sqlalchemy import funcfrom sqlalchemy import Integerfrom sqlalchemy import Stringfrom sqlalchemy.ext.asyncio import AsyncSessionfrom sqlalchemy.ext.asyncio import create_async_enginefrom sqlalchemy.future import selectfrom sqlalchemy.orm import declarative_basefrom sqlalchemy.orm import relationshipfrom sqlalchemy.orm import selectinloadfrom sqlalchemy.orm import sessionmakerBase = declarative_base()class A(Base):__tablename__ = "a"id = Column(Integer, primary_key=True)data = Column(String)create_date = Column(DateTime, server_default=func.now())bs = relationship("B")# required in order to access columns with server defaults# or SQL expression defaults, subsequent to a flush, without# triggering an expired load__mapper_args__ = {"eager_defaults": True}class B(Base):__tablename__ = "b"id = Column(Integer, primary_key=True)a_id = Column(ForeignKey("a.id"))data = Column(String)async def async_main():engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test",echo=True,)async with engine.begin() as conn:await conn.run_sync(Base.metadata.drop_all)await conn.run_sync(Base.metadata.create_all)# expire_on_commit=False will prevent attributes from being expired# after commit.async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)async with async_session() as session:async with session.begin():session.add_all([A(bs=[B(), B()], data="a1"),A(bs=[B()], data="a2"),A(bs=[B(), B()], data="a3"),])stmt = select(A).options(selectinload(A.bs))result = await session.execute(stmt)for a1 in result.scalars():print(a1)print(f"created at: {a1.create_date}")for b1 in a1.bs:print(b1)result = await session.execute(select(A).order_by(A.id))a1 = result.scalars().first()a1.data = "new data"await session.commit()# access attribute subsequent to commit; this is what# expire_on_commit=False allowsprint(a1.data)# for AsyncEngine created in function scope, close and# clean-up pooled connectionsawait engine.dispose()asyncio.run(async_main())#调用同步方法 session.run_sync(sync_func)#多对多关系的删除1. 查询出要删除的多对多关系 user.hobbies2. user.hobbies = [] 删除多对多关系# 实际上直接删除中间表更便捷
关联对象加载:
# 配置orm关系为lazy="noload",我认为非必要无需加载关联对象,lazy其他可使用的值为:'select'、'joined'、'subquery'、'selectin'、'raise',parent = relationship('Parent', bacref='children', lazy='noload')# 在查询中指定需要加载的关联对象session.query(Parent).options(selectinload(Parent.children)).all()详细文档:https://docs.sqlalchemy.org/en/20/orm/loading_relationships.html?highlight=lazy#sqlalchemy.orm.Load.lazyload
Event 钩子:
from sqlalchemy.engine import Enginefrom sqlalchemy.orm import Session@event.listens_for(Engine, "before_execute")def my_before_execute(conn, clauseelement, multiparams, params, execution_options):print("before execute!")@event.listens_for(Session, "after_commit")def my_after_commit(session):print("after commit!")# 更多event节点见 ConnectionEvents类
SQL 编译缓存:
engine = create_engine("postgresql://scott:tiger@localhost/test", query_cache_size=100)#使用的内部缓存称为 LRUCache,DDL 构造通常不参与缓存,因为它们通常不会被重复第二次
# 自定义缓存
my_cache = {}
with engine.connect().execution_options(compiled_cache=my_cache) as conn:conn.execute(table.select())
Alembic 数据库迁移:
1. 安装:pip install alembic
2. 初始化:alembic init alembic
3. 修改配置:alembic.ini配置文件 (你的数据库连接信息)sqlalchemy.url = mysql+pymysql://root:admin@localhost/alembic_demoenv.py (保证你的model被加载到)import osimport sysfrom xxx import Base# 把当前项目路径加入到path中sys.path.append(os.path.dirname(os.path.dirname(__file__)))target_metadata = Base.metadata
4. 生成迁移文件:alembic revision --autogenerate -m "first commit"
5. 更新到数据库:alembic upgrade head
最后不得不吐槽sqlalchemy的文档实在是太乱了,多个版本交错,期待2.0有所改善 官方文档地址:SQLAlchemy Documentation — SQLAlchemy 2.0 Documentation
SQLAlchemy Async相关推荐
- 数据库和ORMS:使用SQLAlchemy与数据库通信
文章目录 1. 环境安装 2. 使用SQLAlchemy与SQL数据库通信 2.1 创建表 2.2 连接数据库 2.3 insert.select 2.4 update.delete 2.5 rela ...
- async python_Async Python 竟不比sync Python 快,怎么回事?
[CSDN编者按]在实际的基准测试下,async (异步)Python比"sync"(同步) Python要慢.而更让人担心的是,async框架在负载下会不稳定. 作者 | Cal ...
- Async Python 竟不比sync Python 快,怎么回事?
[CSDN编者按]在实际的基准测试下,async (异步)Python比"sync"(同步) Python要慢.而更让人担心的是,async框架在负载下会不稳定. 作者 | Cal ...
- 软件测试 | 测试开发 | Aiomysql 与 Sqlalchemy 的使用
背景 之前一直使用tornado做项目,数据库一直使用mongo与redis居多,凭借其优异的异步特性工作的也很稳定高效,最近的项目需要使用 mysql ,由于之前在使用mongo与redis时所使用 ...
- aiomysql + sqlalchemy(ORM) 配合使用
官方文档:https://aiomysql.readthedocs.io/en/latest/sa.html (过时) 其他教程:https://gzm1997.github.io/2018/05/2 ...
- 异步sqlalchemy ORM session使用总结
声明Base和提供异步session 声明Base Base = declarative_base() 模型类需要集成该Base, 建议所有模型类都统一集成同一个Base, 这样在对模型类的创建和修改 ...
- SQLALchemy之Python连接MySQL
20220225 https://www.cnblogs.com/toheart/p/9802990.html pymssql连接sqlserver https://blog.csdn.net/qq_ ...
- setTimeout、setInterval、promise、async/await的顺序详解(多种情况,非常详细~)
本文很长,列举的情况很多. 在阅读本文之前,如果您有充足的时间,请新建一个项目与本文一同实践. 每段代码都有对应的解释,但是自己动手尝试印象才会更深哦~ setInterval:表示多久执行一次,需要 ...
- 阮一峰老师的ES6入门:async 函数
async 函数 1. 含义 ES2017 标准引入了 async 函数,使得异步操作变得更加方便. async 函数是什么?一句话,它就是 Generator 函数的语法糖. 前文有一个 Gener ...
最新文章
- 嵌入式开发板设置无密码登录
- 『原创』再谈用 php 实现域名 whois 信息查询
- python有什么作用-大数据学习之python语言有什么作用?
- .net 插件式开发学习总结
- java主线程控制子线程_CountDownLatch控制主线程等子线程执行完--Java多线程
- 总结:一款Loading动画的实现思路
- POJ3889-Fractal Streets【分形,递归,分治】
- 诗歌rails 之自定义Helper模块
- 写给大数据开发初学者的话5
- Python3与OpenCV3.3 图像处理(五)--图像运算
- python的运行环境是如何搭建的_教女朋友学Python运行环境搭建
- linux系统怎么刷新,Fedora Linux如何更新系统
- Python破解验证码,只要15分钟就够了!
- 译文伪原创的全文翻译软件
- 【文档/键值数据库】文档数据库和键值数据库有什么区别
- iPAD越狱后下载破解版的pad软件方法总录
- Max函数、Min函数
- vue 判断元素内容是否超过宽度
- 得物(毒)APP,8位抽奖码需求,这不就是产品给我留的数学作业!
- React使用iconfont阿里巴巴矢量图库