数据库和ORMS:使用SQLAlchemy与数据库通信
文章目录
- 1. 环境安装
- 2. 使用SQLAlchemy与SQL数据库通信
- 2.1 创建表
- 2.2 连接数据库
- 2.3 insert、select
- 2.4 update、delete
- 2.5 relationships
- 2.6 用Alembic进行数据库迁移
learn from 《Building Data Science Applications with FastAPI》
1. 环境安装
docker 安装 MongoDB 服务
docker run -d --name fastapi-mongo -p 27017:27017 mongo:4.4
2. 使用SQLAlchemy与SQL数据库通信
安装 pip install databases[sqlite]
2.1 创建表
# models.pyimport sqlalchemy
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Fieldmetadata = sqlalchemy.MetaData() # 创建元数据对象posts = sqlalchemy.Table( # 创建表对象'posts', # 表名metadata, # 元数据对象# 列对象(列名,类型,其他选项)sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True, autoincrement=True),sqlalchemy.Column('publication_date', sqlalchemy.DateTime(), nullable=False),sqlalchemy.Column('title', sqlalchemy.String(255), nullable=False),sqlalchemy.Column('text', sqlalchemy.Text(), nullable=False),
)class PostBase(BaseModel):title: strtext: strpublication_date: datetime = Field(dafault_factory=datetime.now)class PostPartialUpdate(BaseModel):text: Optional[str] = Nonecontent: Optional[str] = Noneclass PostCreate(PostBase):passclass PostDB(PostBase):id: int
2.2 连接数据库
# _*_ coding: utf-8 _*_
# @Time : 2022/3/8 9:28
# @Author : Michael
# @File : database.py
# @desc :
import sqlalchemy
from databases import Database
DB_URL = 'sqlite:///cp6_sqlalchemy.db'
database = Database(DB_URL)
sqlalchemy_engine = sqlalchemy.create_engine(DB_URL)def get_database() -> Database:return database
2.3 insert、select
# _*_ coding: utf-8 _*_
# @Time : 2022/3/8 9:40
# @Author : Michael
# @File : app.py
# @desc :from typing import List, Tuple
import uvicorn
from databases import Database
from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdateapp = FastAPI()@app.on_event('startup') # 启动的时候执行数据库连接
async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)@app.on_event("shutdown") # 关闭的时候执行数据库断开连接
async def shutdown():await get_database().disconnect()async def pagination(skip: int = Query(0, ge=0),limit: int = Query(10, ge=0),) -> Tuple[int, int]:capped_limit = min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database = Depends(get_database)) -> PostDB:select_query = posts.select().where(posts.c.id == id)raw_post = await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)return PostDB(**raw_post)# 开始插入数据
@app.post("/posts/", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(post: PostCreate, db: Database = Depends(get_database)) -> PostDB:# 创建插入语句,不必手写sqlinsert_query = posts.insert().values(post.dict())# 执行插入语句命令post_id = await db.execute(insert_query)post_db = await get_post_or_404(post_id, db)return post_db@app.get("/posts/{id}", response_model=PostDB)
async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:return post@app.get("/posts")
async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database),) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return resultsif __name__ == '__main__':uvicorn.run(app='app:app', host="127.0.0.1", port=8001, reload=True, debug=True)
2.4 update、delete
# update
@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(post_update: PostPartialUpdate,post: PostDB = Depends(get_post_or_404),database: Database = Depends(get_database)) -> PostDB:update_query = (posts.update().where(posts.c.id == post.id).values(post_update.dict(exclude_unset=True)))await database.execute(update_query)post_db = await get_post_or_404(post.id, database)return post_db
# delete
@app.delete("/posts/{id}",status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(post: PostDB = Depends(get_post_or_404),database: Database = Depends(get_database)) -> None:delete_query = posts.delete().where(posts.c.id == post.id)await database.execute(delete_query)
2.5 relationships
models.py 编写新的表
comments = sqlalchemy.Table("comments",metadata,sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),# 定义连接的外键sqlalchemy.Column("post_id", sqlalchemy.ForeignKey("posts.id", ondelete="CASCADE"), nullable=False),sqlalchemy.Column("publication_date", sqlalchemy.DateTime(), nullable=False),sqlalchemy.Column("content", sqlalchemy.Text(), nullable=False),
)class CommentBase(BaseModel):post_id: intpublication_date: datetime = Field(default_factory=datetime.now)content: strclass CommentCreate(CommentBase):passclass CommentDB(CommentBase):id: int
app.py 添加内容
from typing import List, Mapping, Tuple, cast
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDB@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED)
async def create_comment(comment: CommentCreate, database: Database = Depends(get_database)
) -> CommentDB:# 选取post表单数据select_post_query = posts.select().where(posts.c.id == comment.post_id)post = await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {id} does not exist")# 插入comment 语句insert_query = comments.insert().values(comment.dict())comment_id = await database.execute(insert_query)# 查询 commentselect_query = comments.select().where(comments.c.id == comment_id)raw_comment = cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)
获取一个post
的全部comments
models.py
class PostPublic(PostDB):comments: Optional[List[CommentDB]] = None
app.py
# _*_ coding: utf-8 _*_
# @Time : 2022/3/8 9:40
# @Author : Michael
# @File : app.py
# @desc :from typing import List, Mapping, Tuple, cast
import uvicorn
from databases import Database
from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDB, \PostPublicapp = FastAPI()@app.on_event('startup') # 启动的时候执行数据库连接
async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)@app.on_event("shutdown") # 关闭的时候执行数据库断开连接
async def shutdown():await get_database().disconnect()async def pagination(skip: int = Query(0, ge=0),limit: int = Query(10, ge=0), ) -> Tuple[int, int]:capped_limit = min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database = Depends(get_database)) -> PostPublic:select_query = posts.select().where(posts.c.id == id)raw_post = await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)# 编号为id的post的所有commentsselect_post_comment_query = comments.select().where(comments.c.post_id == id)raw_comments = await database.fetch_all(select_post_comment_query)comments_list = [CommentDB(**row) for row in raw_comments]return PostPublic(**raw_post, comments=comments_list)# 开始插入数据
@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(post: PostCreate, db: Database = Depends(get_database)) -> PostPublic:# 创建插入语句,不必手写sqlinsert_query = posts.insert().values(post.dict())# 执行插入语句命令post_id = await db.execute(insert_query)post_db = await get_post_or_404(post_id, db)return post_db@app.get("/posts")
async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database), ) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return results@app.get("/posts/{id}", response_model=PostPublic)
async def get_post(post: PostPublic = Depends(get_post_or_404)) -> PostPublic:return post@app.get("/posts")
async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database), ) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return results# update
@app.patch("/posts/{id}", response_model=PostPublic)
async def update_post(post_update: PostPartialUpdate,post: PostPublic = Depends(get_post_or_404),database: Database = Depends(get_database)) -> PostPublic:update_query = (posts.update().where(posts.c.id == post.id).values(post_update.dict(exclude_unset=True)))await database.execute(update_query)post_db = await get_post_or_404(post.id, database)return post_db# delete
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(post: PostPublic = Depends(get_post_or_404),database: Database = Depends(get_database)) -> None:delete_query = posts.delete().where(posts.c.id == post.id)await database.execute(delete_query)@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED)
async def create_comment(comment: CommentCreate, database: Database = Depends(get_database)
) -> CommentDB:select_post_query = posts.select().where(posts.c.id == comment.post_id)post = await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {comment.post_id} does not exist")insert_query = comments.insert().values(comment.dict())comment_id = await database.execute(insert_query)select_query = comments.select().where(comments.c.id == comment_id)raw_comment = cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)if __name__ == '__main__':uvicorn.run(app='app:app', host="127.0.0.1", port=8001, reload=True, debug=True)
2.6 用Alembic进行数据库迁移
pip install alembic
终端输入:
alembic init alembic
初始化迁移环境,其中包括一组文件和目录,Alembic
将在其中存储其配置和迁移文件,需要一起提交 git
在 env.py 中导入元数据
from web_python_dev.sqlalchemy1.models import metadatatarget_metadata = metadata
编辑ini
配置
开始迁移
alembic revision --autogenerate -m "Initial migration"
之后会生成一个py文件
该代码内有两个函数:upgrade
,downgrade
用于数据迁移和回滚
# 升级
alembic upgrade head
数据的迁移和升级之前请做好备份和测试,防止丢失损坏
https://alembic.sqlalchemy.org/en/latest/index.html
数据库和ORMS:使用SQLAlchemy与数据库通信相关推荐
- python 笔记 之 sqlalchemy操作数据库-创建表
2019独角兽企业重金招聘Python工程师标准>>> ''' pip install SQLAlchemy 操作数据库-创建表 ''' import sqlalchemy''' s ...
- python sql 日期查询_Python--flask使用 SQLAlchemy查询数据库最近时间段或之前的数据...
Python--flask使用 SQLAlchemy查询数据库最近时间段或之前的数据 博客说明 文章所涉及的资料来自互联网整理和个人总结,意在于个人学习和经验汇总,如有什么地方侵权,请联系本人删除,谢 ...
- pymsql 与 SQLAlchemy 操作数据库的区别
pymsql 与 SQLAlchemy 操作数据库的区别 pymsql 1.是获得连接对象 2.是获取连接对象 SQLAlchemy 1.是数据库连接配置 2.是通过配置创建引擎 3.是通过引擎创建数 ...
- SQLAlchemy 操作数据库
SQLAlchemy 操作数据库 SQLAlchemy为Python提供了不同数据库的统一接口,采用ORM的方式操作数据库,简洁优雅 一.安装 直接通过pip安装即可 pip install sqla ...
- sqlalchemy 初始化数据库
初始化数据库 方法1 from sqlalchemy import create_engine, MetaDataengine = create_engine('sqlite:///foo.db', ...
- AntDB数据库与DSG强强联手,助力通信行业核心系统国产化
日前,湖南亚信安慧科技有限公司(简称:亚信安慧科技)与迪思杰(北京)数据管理技术有限公司开展了产品兼容互认工作. 图1:AntDB数据库与DSG SuperSync V3.2兼容互认证明 近年来,国家 ...
- SqlAlchemy Alembic数据库升级与降级简易教程
前言 通常我们会将我们的代码放入到某个VCS(版本控制系统)中,进行可追溯的版本管理.一个项目除了代码,通常还会有一个数据库,这个数据库可能会随着项目的演进发生变化,甚至需要可以回滚到过去的某个状态, ...
- 自制预防校园暴力的智能监控系统:远程加载表格数据,SQLAlchemy操作数据库,云服务器(CentOS)
本文是[小码哥李明杰老师]指导完成的山东大学引航计划公益人工智能科研实训项目. 自制预防校园暴力的智能监控系统 重定向路径 路由配置(访问根路径,直接重定向到main): export default ...
- 服务器怎么打开数据库文件夹,服务器怎么打开数据库文件夹下
服务器怎么打开数据库文件夹下 内容精选 换一换 GaussDB(for MySQL)全兼容MySQL协议,因此,连接GaussDB(for MySQL)实例目前有两种方式:普通连接和SSL连接.其中, ...
- mysql数据库导出最大值_4.6 MySQL数据库导入与导出攻略
4.6 MySQL数据库导入与导出攻略 4.6.1 Linux下MySQL数据库导入与导出 1. MySQL数据库的导出命令参数 主要是通过两个mysql和mysqldump命令来执行 (1) MyS ...
最新文章
- 还是贪心(结构体排序)
- 最新卡通渲染效果图(附带一张次世代帅哥)
- python 递归 写平方_Python算法:推导、递归和规约
- Nagios(一)——LAMP 环境搭建
- matlab如何判断一个文件夹里面是否包含某个含有部分文件名的文件_如何构建一个成功的AI PoC(概念验证项目)...
- 编译JAVA的错误: 编码ascii的不可映射字符
- 用微信公众号控制ESP8266的LED,进一步使用微信当遥控器
- python按顺序执行函数_Python3的unittest用例按编写顺序执行
- 在GEE平台提取Sentinel-1 SAR GRD的VV+VH波段
- 基于PaaS人事部门间平台多重身份的技术解决方案
- 西部狂徒自建服务器,在《西部狂徒》中如何快速建立自己根据地?杀人放火是上策...
- Excel 轻松制作 二级联动 下拉列表清单
- VUE2使用的JSON编辑器
- 欢聚时代YY/测试实习面试
- Java 字节码技术:不积细流,无以成江河
- 关于indexOf的全等匹配
- NLP-二分类的应用-区分外卖评论好评/差评
- flink-streaming-platform-web 源码解读
- 银行借ApplePay反攻 七方利益分润未解
- XV6源码解读:安装与编译
热门文章
- Linux 监控命令之 netstat
- php多文件上传存储到表,PHP 实现一种多文件上传的方法
- vue router传参_新手使用vue-router传参时注意事项
- viewsource和viewparsed_Network Panel说明
- 您的apple id 暂时不符合使用此应用程序_Mac相机不工作时该怎么办
- 【Python基础入门系列】第09天:Python tuple
- HALCON常用算子(HALCON13.0)
- VS2010中 C++创建DLL图解
- c语言结构体指针初始化
- Ubuntu16.04通过GPT挂载硬盘