声明Base和提供异步session

声明Base

Base = declarative_base()

模型类需要集成该Base, 建议所有模型类都统一集成同一个Base, 这样在对模型类的创建和修改统一管理。

sqlalchemy 使用异步ORM, 需要使用到异步的session:

提供异步session

通过装饰器提供异步session, 这样就不需要在操作数据库的方法中每次实例化一个异步session, 需要的地方装饰一下就行了。

database.py:

import contextlib
from typing import Callable
from asyncio import current_task
from functools import wrapsfrom sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_scoped_session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base# 将连接数据库的URI 写在配置文件中读取
from dqc import SQLALCHEMY_URI# 所有模型都要统一继承该Base
Base = declarative_base()class DatabaseManager:"""连接元数据库的类,在__init__中进行初始化"""def __init__(self):self._engine = create_async_engine(SQLALCHEMY_URI, echo=False,)self.session = Noneself.initialize()def initialize(self, scope_function: Callable = None):"""Configure class for creating scoped sessions."""# Create the session factory classesasync_session_factory  = sessionmaker(self._engine, expire_on_commit=False, class_=AsyncSession)self.session = async_scoped_session(async_session_factory, scopefunc=current_task)def cleanup(self):"""Cleans up the database connection pool."""if self._engine is not None:self._engine.dispose()@contextlib.asynccontextmanager
async def create_session():"""Contextmanager that will create and teardown a session."""db = DatabaseManager()session = db.sessiontry:yield sessionawait session.commit()except Exception:await session.rollback()raisefinally:await session.close()def provide_session(func):"""Function decorator that provides a session if it isn't provided.If you want to reuse a session or run the function as part of adatabase transaction, you pass it to the function, if not this wrapperwill create one and close it for you."""@wraps(func)async def wrapper(*args, **kwargs):arg_session = 'session'func_params = func.__code__.co_varnamessession_in_args = arg_session in func_params and \func_params.index(arg_session) < len(args)session_in_kwargs = arg_session in kwargsif session_in_kwargs or session_in_args:return await func(*args, **kwargs)else:async with create_session() as session:kwargs[arg_session] = sessionreturn await func(*args, **kwargs)return wrapper

使用异步session案例table:

一个规则模板对应多个规则,三张表:规则模板表,规则表,关联关系表

规则模板表

class RuleTemplate(Base):__tablename__ = 'd_rule_template'id = Column(Integer, primary_key=True, comment='规则模板id')template_name = Column(String(250), nullable=False, comment='规则模板名称')rules = relationship('RuleInfo', lazy='joined', secondary=template_rules, backref='template')deletion = Column(Integer, nullable=False, comment='是否删除')@classmethod@provide_sessionasync def add_template(cls, session=None, **kwargs):pass@classmethod@provide_sessionasync def delete_template(cls, id, session=None):pass@classmethod@provide_sessionasync def get_rule_template_by_id(cls, id, session=None):pass@classmethod@provide_sessionasync def update_template(cls, session=None, **kwargs):pass

规则表

from database import Base, provide_sessionclass RuleInfo(Base):__tablename__ = 'd_rule_info'id = Column(Integer, primary_key=True)rule_name = Column(String(250), comment='规则名称')rule_type = Column(String(100), nullable=True, comment="规则类型")deletion = Column(Integer, nullable=False, comment='是否删除')@classmethod@provide_sessionasync def add_rule(cls, session=None, **kwargs):pass@classmethod@provide_sessionasync def delete_rule(cls, session=None, **kwargs):pass@classmethod@provide_sessionasync def get_rule_by_id(cls, id, session=None):pass@classmethod@provide_sessionasync def update_rule_info(cls, session=None, **kwargs):pass

关联关系表

template_rules = Table('d_template_rule_relation',Base.metadata,Column('id', Integer, primary_key=True),Column('rule_template_id', Integer, ForeignKey('d_rule_template.id')),Column('rule_info_id', Integer, ForeignKey('d_rule_info.id')),UniqueConstraint('rule_template_id', 'rule_info_id')
)

增加

 @classmethod@provide_sessionasync def add_template(cls, session=None, **kwargs):"""add rule template:param template_name: 模板名称:return: rule_template.id"""async with session() as session:rule_template = RuleTemplate()session.add(rule_template)for k, v in kwargs.items():if v is None:continuesetattr(rule_template, k, v)await session.commit()return rule_template.id@classmethod@provide_sessionasync def add_rule(cls, session=None, **kwargs):"""add rule:param rule_name: 规则名称:return: rule.id"""async with session() as session:rule = RuleInfo()session.add(rule)for k, v in kwargs.items():if v is None:continuesetattr(rule, k, v)await session.commit()return rule.id

删除

    @classmethod@provide_sessionasync def delete_template(cls, id, session=None):"""软删除模板:param id: 模板id"""async with session() as session:await session.execute(update(RuleTemplate).where(RuleTemplate.id == id).values(deletion=1))# 直接删除数据# await session.execute(#    delete(RuleTemplate).where(RuleTemplate.id == id)# )await session.commit()@classmethod@provide_sessionasync def delete_rule(cls, session=None, **kwargs):"""删除规则:param id: 规则id:return:"""id = kwargs.get("id")def fetch_and_update_objects(session2):# 这里使用闭包函数来写同步方法result = session2.execute(select(RuleInfo).where(RuleInfo.id == id))for rule in result.scalars():rule.deletion = 1try:# 如果该规则引用了规则模板,删除规则的同时需要删除关联表中的数据rule.template[0].rules.remove(rule)except IndexError:passasync with session() as session2:# 删除后将模板与规则桥表删除对应数据# 通过rule.template 来获取规则关联的模板对象为同步代码,在异步session 中执行同步代码# 需要使用session.run_sync(fetch_and_update_objects) 方法# fetch_and_update_objects 为同步代码方法名await session2.run_sync(fetch_and_update_objects)await session2.commit()

查询

 @classmethod@provide_sessionasync def get_rule_template_by_id(cls, id, session=None):"""根据模板id查询模板对象"""results = await session.execute(select(RuleTemplate).where(RuleTemplate.id == id))data = results.scalars().first()return data@classmethod@provide_sessionasync def get_rule_by_id(cls, id, session=None):"""根据规则id查询规则对象"""rule_info = Nonedef get_template(session2):nonlocal rule_inforesults = session2.execute(select(RuleInfo).where(RuleInfo.id == id))rows = results.fetchall()for row in rows:# 由于rule_template 表rules = relationship('RuleInfo', lazy='joined', secondary=template_rules, backref='template')# 为了提高查询效率这里关联查询的关系为lazy='joined', # 会导致查询规则对象时不会主动将该规则绑定的模板对象加载出来,# 需要通过使用同步代码rule.template 主动加载模板对象,# 否在在session 结束后,获取的规则对象将没有模板对象信息rule = row.RuleInfotemplate = rule.templaterule_info = rulerule_info.template = templateasync with session() as session2:# 加载rule.template 属于同步代码,需要使用session2.run_sync() 方法await session2.run_sync(get_template)return rule_info

join关联查询

增加一张模型表:规则校验结果表

class CheckResult(Base):__tablename__ = 'd_check_result'id = Column(Integer, primary_key=True)rule_id = Column(Integer, nullable=False, comment='规则id')plan_execution_date = Column(Integer, nullable=False, comment='计划执行时间,时间戳格式')real_execution_date = Column(Integer, nullable=True, comment='实际执行时间,时间戳格式')time_duration = Column(Integer, nullable=False, comment='执行时长')check_result = Column(Integer, nullable=True, comment="校验结果,1 通过 2异常 3等待结果")
@classmethod
@provide_session
async def get_rule_result_list(cls, session=None, **kwargs):""" 获取结果列表信息:param page_num: 页码:param page_size: 页面大下:param query: 模糊查询条件:param plan_execution_date: 过滤条件:计划执行时间:param check_result: 过滤条件:校验结果类型:return:"""# 获取模糊查询条件query = kwargs.get("query")# 获取准确过滤条件plan_execution_date = kwargs.get('plan_execution_date')check_results = kwargs.get('check_results')# 分页条件page_num = kwargs.get("page_num", 1)page_size = kwargs.get("page_size", 40)# and 拼接准确查询SQL ORMbase_filter = and_(CheckResult.plan_execution_date <= plan_execution_date if plan_execution_date else True,CheckResult.check_result.in_(check_results) if state else True,RuleInfo.deletion != 1)# or 拼接模糊查询SQL ORMquery_rule = or_(RuleInfo.rule_name.like('%{}%'.format(query)),RuleInfo.rule_type.like('%{}%'.format(query)),)async with session() as session:# func.count() 查询总数total = await session.execute(select([func.count()]).select_from(CheckResult, RuleInfo).outerjoin_from(CheckResult, RuleInfo, RuleInfo.id == CheckResult.rule_id).filter(base_filter, query_rule))total = total.scalar()# 左连接关联查询base_join_select = select(CheckResult.rule_id.label('rule_id'),CheckResult.check_result.label('check_result')).outerjoin_from(CheckResult, RuleInfo, RuleInfo.id == CheckResult.rule_id)\.outerjoin_from(RuleInfo, DataSourceType, RuleInfo.data_source_type_id == DataSourceType.id)\.filter(base_filter, query_rule)# 聚合分组查询不同校验结果数量# 子查询, 需要通过sub.c 获取父查询中的labelsub = base_join_select.subquery()group_by_select = select(sub.c.check_result, func.count(sub.c.check_result))\.group_by(sub.c.check_result)state_count_results = await session.execute(group_by_select)# join关联查询需要使用fetchall()方法获取所有查询内容state_count_rows = state_count_results.fetchall()state_info = {}for row in state_count_rows:state_info[row[0]] = row[1]# join关联并分页查询join_pagination_select = select(CheckResult.check_result,RuleInfo.id,RuleInfo.rule_name)\.outerjoin_from(CheckResult, RuleInfo, RuleInfo.id == CheckResult.rule_id).filter(base_filter, query_rule)\.limit(page_size).offset((int(page_num) - 1) * page_size )results = await session.execute(join_pagination_select)all_rows = results.fetchall()list_info = []for row in all_rows:result_info = {}result_info['check_result'] = row[0]result_info['id'] = row[1]result_info['rule_name'] = row[2]list_info.append(result_info)return total, list_info, state_info

修改

    @classmethod@provide_sessionasync def update_template(cls, session=None, **kwargs):"""编辑规则模板"""id = kwargs.get('id')template_name = kwargs.get('template_name')async with session() as session:# 查询模板对象使用的与接下来修改使用的是同一个session 中# 保证一致性rule_template = await cls.get_rule_template_by_id(id, session=session)for k, v in kwargs.items():if v is None or k == 'id':continuesetattr(rule_template, k, v)await session.commit()

异步sqlalchemy ORM session使用总结相关推荐

  1. 解决 “_pickle.PicklingError: Can‘t pickle: it‘s not the same object as sqlalchemy.orm.session.Session“

    问题背景 在一个使用 fastapi 框架的 web 项目调试过程中,遇到了一个有关多进程参数序列化的问题.session对象作为参数传子进程时时报"_pickle.PicklingErro ...

  2. python 之路,Day11 (下)- sqlalchemy ORM

    python 之路,Day11 - sqlalchemy ORM 本节内容 ORM介绍 sqlalchemy安装 sqlalchemy基本使用 多外键关联 多对多关系 表结构设计作业 1. ORM介绍 ...

  3. sqlalchemy.orm 多条件查询更新

    sqlalchemy.orm 多条件datas = session.query(Stock).filter(and_(Stock.now_price<25,Stock.type==0,Stock ...

  4. python之SQLAlchemy ORM

    前言: 这篇博客主要介绍下SQLAlchemy及基本操作,写完后有空做个堡垒机小项目.有兴趣可看下python之数据库(mysql)操作.下篇博客整理写篇关于Web框架和django基础~~ 一.OR ...

  5. 转载--SqlAlchemy ORM 学习

    转载原文地址:http://blog.csdn.net/yueguanghaidao/article/details/7485345,http://blog.csdn.net/yueguanghaid ...

  6. Python sqlalchemy orm 多外键关联

     多外键关联 注:在两个表之间进行多外键链接 如图: 案例: # 创建两张表并添加外键主键 # 调用Column创建字段 加类型 from sqlalchemy import Integer, For ...

  7. SQLAlchemy ORM教程之三:Relationship

    建立关系 之前我们已经建立了一个用户(User)表,现在我们来考虑增加一个与用户关联的新的表.在我们的系统里面,用户可以存储多个与之相关的email地址.这是一种基本的一对多的关系.我们把这个新增加的 ...

  8. SQLAlchemy ORM教程之二:Query

    Query Session的query函数会返回一个Query对象.query函数可以接受多种参数类型.可以是类,或者是类的instrumented descriptor.下面的这个例子取出了所有的U ...

  9. SQLAlchemy ORM教程之一:Create

    Object Relational Tutorial 所谓ORM(Object Relational Mapping),就是建立其由Python类到数据库表的映射关系:一个Python实例(insta ...

最新文章

  1. 三,ES6中需要注意的特性(重要)
  2. Python培训教程分享:10款超好用的Python开发工具
  3. Centos7 安装maven3.5.0和git
  4. php中复选框删除数据_checkbox 删除,checkbox_PHP教程
  5. Ubuntu 11.10 安装Adobe Air 和卸载Air中的软件
  6. Win64 驱动内核编程-17. MINIFILTER(文件保护)
  7. kali2020提高权限到root
  8. 中国大学MOOC 编译原理 第8讲测验(计分)
  9. Pytorch(二) --梯度下降法
  10. @Springboot搭建项目controller层接收json格式的对象失败
  11. POJ 1183 反正切函数的应用(数学代换,基本不等式)
  12. Linux Jump Label/static-key机制详解
  13. Netty工作笔记0031---NIO零拷贝应用案例
  14. python安装不上怎么办_python依赖安装失败怎么办
  15. python with as 用法_你在 Python 中常常写的 with..as.. 到底是个啥?
  16. 多头注意力比单头好在哪?
  17. Visual Studio 2013官方简体中文专业版/旗舰版/高级版下载(含激活序列号)
  18. Nginx网页优化(版本、缓存时间、日志分隔、连接超时)
  19. 杂记——贝叶斯可信区间与频率置信区间的区别
  20. 微信公众号申请流程整理

热门文章

  1. python下使用pip freeze requirements.txt命令迁移模块(详细教程)
  2. 各种排序算法比较(java)
  3. unity3D 中的空气墙可以进行缩放吗
  4. 在Dash上使用d3.js
  5. uniqueidentifier属性ID的自动生成
  6. 梦古服务器端公告文件修改,9月8日开放新服以及限制部分服务器角色创建公告...
  7. C语言中的struct用法
  8. ESP32-USB Serial/JTAG Controller使用
  9. intellij idea 使用Tomcat部署的项目在哪里,为什么不在Tomcat的webapps目录下面
  10. catia圆型与矩形的桥接_4.3.7.2-桥接曲面_复杂构面