文章目录

  • 1. 环境安装
  • 2. 使用SQLAlchemy与SQL数据库通信
    • 2.1 创建表
    • 2.2 连接数据库
    • 2.3 insert、select
    • 2.4 update、delete
    • 2.5 relationships
    • 2.6 用Alembic进行数据库迁移

learn from 《Building Data Science Applications with FastAPI》

1. 环境安装

docker 安装 MongoDB 服务

 docker run -d --name fastapi-mongo -p 27017:27017 mongo:4.4

2. 使用SQLAlchemy与SQL数据库通信

安装 pip install databases[sqlite]

2.1 创建表

# models.pyimport sqlalchemy
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Fieldmetadata = sqlalchemy.MetaData()  # 创建元数据对象posts = sqlalchemy.Table(  # 创建表对象'posts',  # 表名metadata,  # 元数据对象# 列对象(列名,类型,其他选项)sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True, autoincrement=True),sqlalchemy.Column('publication_date', sqlalchemy.DateTime(), nullable=False),sqlalchemy.Column('title', sqlalchemy.String(255), nullable=False),sqlalchemy.Column('text', sqlalchemy.Text(), nullable=False),
)class PostBase(BaseModel):title: strtext: strpublication_date: datetime = Field(dafault_factory=datetime.now)class PostPartialUpdate(BaseModel):text: Optional[str] = Nonecontent: Optional[str] = Noneclass PostCreate(PostBase):passclass PostDB(PostBase):id: int

2.2 连接数据库

# _*_ coding: utf-8 _*_
# @Time : 2022/3/8 9:28
# @Author : Michael
# @File : database.py
# @desc :
import sqlalchemy
from databases import Database
DB_URL = 'sqlite:///cp6_sqlalchemy.db'
database = Database(DB_URL)
sqlalchemy_engine = sqlalchemy.create_engine(DB_URL)def get_database() -> Database:return database

2.3 insert、select

# _*_ coding: utf-8 _*_
# @Time : 2022/3/8 9:40
# @Author : Michael
# @File : app.py
# @desc :from typing import List, Tuple
import uvicorn
from databases import Database
from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdateapp = FastAPI()@app.on_event('startup') # 启动的时候执行数据库连接
async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)@app.on_event("shutdown") # 关闭的时候执行数据库断开连接
async def shutdown():await get_database().disconnect()async def pagination(skip: int = Query(0, ge=0),limit: int = Query(10, ge=0),) -> Tuple[int, int]:capped_limit = min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database = Depends(get_database)) -> PostDB:select_query = posts.select().where(posts.c.id == id)raw_post = await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)return PostDB(**raw_post)# 开始插入数据
@app.post("/posts/", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(post: PostCreate, db: Database = Depends(get_database)) -> PostDB:# 创建插入语句,不必手写sqlinsert_query = posts.insert().values(post.dict())# 执行插入语句命令post_id = await db.execute(insert_query)post_db = await get_post_or_404(post_id, db)return post_db@app.get("/posts/{id}", response_model=PostDB)
async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:return post@app.get("/posts")
async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database),) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return resultsif __name__ == '__main__':uvicorn.run(app='app:app', host="127.0.0.1", port=8001, reload=True, debug=True)



2.4 update、delete

# update
@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(post_update: PostPartialUpdate,post: PostDB = Depends(get_post_or_404),database: Database = Depends(get_database)) -> PostDB:update_query = (posts.update().where(posts.c.id == post.id).values(post_update.dict(exclude_unset=True)))await database.execute(update_query)post_db = await get_post_or_404(post.id, database)return post_db

# delete
@app.delete("/posts/{id}",status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(post: PostDB = Depends(get_post_or_404),database: Database = Depends(get_database)) -> None:delete_query = posts.delete().where(posts.c.id == post.id)await database.execute(delete_query)

2.5 relationships

models.py 编写新的表

comments = sqlalchemy.Table("comments",metadata,sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),# 定义连接的外键sqlalchemy.Column("post_id", sqlalchemy.ForeignKey("posts.id", ondelete="CASCADE"), nullable=False),sqlalchemy.Column("publication_date", sqlalchemy.DateTime(), nullable=False),sqlalchemy.Column("content", sqlalchemy.Text(), nullable=False),
)class CommentBase(BaseModel):post_id: intpublication_date: datetime = Field(default_factory=datetime.now)content: strclass CommentCreate(CommentBase):passclass CommentDB(CommentBase):id: int

app.py 添加内容

from typing import List, Mapping, Tuple, cast
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDB@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED)
async def create_comment(comment: CommentCreate, database: Database = Depends(get_database)
) -> CommentDB:# 选取post表单数据select_post_query = posts.select().where(posts.c.id == comment.post_id)post = await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {id} does not exist")# 插入comment 语句insert_query = comments.insert().values(comment.dict())comment_id = await database.execute(insert_query)# 查询 commentselect_query = comments.select().where(comments.c.id == comment_id)raw_comment = cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)


获取一个post的全部comments

models.py

class PostPublic(PostDB):comments: Optional[List[CommentDB]] = None

app.py

# _*_ coding: utf-8 _*_
# @Time : 2022/3/8 9:40
# @Author : Michael
# @File : app.py
# @desc :from typing import List, Mapping, Tuple, cast
import uvicorn
from databases import Database
from fastapi import Depends, FastAPI, HTTPException, Query, statusfrom database import get_database, sqlalchemy_engine
from models import metadata, posts, PostDB, PostCreate, PostPartialUpdate, comments, CommentCreate, CommentDB, \PostPublicapp = FastAPI()@app.on_event('startup')  # 启动的时候执行数据库连接
async def startup():await get_database().connect()metadata.create_all(sqlalchemy_engine)@app.on_event("shutdown")  # 关闭的时候执行数据库断开连接
async def shutdown():await get_database().disconnect()async def pagination(skip: int = Query(0, ge=0),limit: int = Query(10, ge=0), ) -> Tuple[int, int]:capped_limit = min(100, limit)return (skip, capped_limit)async def get_post_or_404(id: int, database: Database = Depends(get_database)) -> PostPublic:select_query = posts.select().where(posts.c.id == id)raw_post = await database.fetch_one(select_query)if raw_post is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)# 编号为id的post的所有commentsselect_post_comment_query = comments.select().where(comments.c.post_id == id)raw_comments = await database.fetch_all(select_post_comment_query)comments_list = [CommentDB(**row) for row in raw_comments]return PostPublic(**raw_post, comments=comments_list)# 开始插入数据
@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(post: PostCreate, db: Database = Depends(get_database)) -> PostPublic:# 创建插入语句,不必手写sqlinsert_query = posts.insert().values(post.dict())# 执行插入语句命令post_id = await db.execute(insert_query)post_db = await get_post_or_404(post_id, db)return post_db@app.get("/posts")
async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database), ) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return results@app.get("/posts/{id}", response_model=PostPublic)
async def get_post(post: PostPublic = Depends(get_post_or_404)) -> PostPublic:return post@app.get("/posts")
async def list_posts(pagination: Tuple[int, int] = Depends(pagination),database: Database = Depends(get_database), ) -> List[PostDB]:skip, limit = paginationselect_query = posts.select().offset(skip).limit(limit)rows = await database.fetch_all(select_query)results = [PostDB(**row) for row in rows]return results# update
@app.patch("/posts/{id}", response_model=PostPublic)
async def update_post(post_update: PostPartialUpdate,post: PostPublic = Depends(get_post_or_404),database: Database = Depends(get_database)) -> PostPublic:update_query = (posts.update().where(posts.c.id == post.id).values(post_update.dict(exclude_unset=True)))await database.execute(update_query)post_db = await get_post_or_404(post.id, database)return post_db# delete
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(post: PostPublic = Depends(get_post_or_404),database: Database = Depends(get_database)) -> None:delete_query = posts.delete().where(posts.c.id == post.id)await database.execute(delete_query)@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED)
async def create_comment(comment: CommentCreate, database: Database = Depends(get_database)
) -> CommentDB:select_post_query = posts.select().where(posts.c.id == comment.post_id)post = await database.fetch_one(select_post_query)if post is None:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {comment.post_id} does not exist")insert_query = comments.insert().values(comment.dict())comment_id = await database.execute(insert_query)select_query = comments.select().where(comments.c.id == comment_id)raw_comment = cast(Mapping, await database.fetch_one(select_query))return CommentDB(**raw_comment)if __name__ == '__main__':uvicorn.run(app='app:app', host="127.0.0.1", port=8001, reload=True, debug=True)


2.6 用Alembic进行数据库迁移

pip install alembic

终端输入:

alembic init alembic

初始化迁移环境,其中包括一组文件和目录,Alembic将在其中存储其配置和迁移文件,需要一起提交 git

在 env.py 中导入元数据

from web_python_dev.sqlalchemy1.models import metadatatarget_metadata = metadata

编辑ini配置

开始迁移

alembic revision --autogenerate -m "Initial migration"

之后会生成一个py文件

该代码内有两个函数:upgradedowngrade用于数据迁移和回滚

# 升级
alembic upgrade head

数据的迁移和升级之前请做好备份和测试,防止丢失损坏
https://alembic.sqlalchemy.org/en/latest/index.html

数据库和ORMS:使用SQLAlchemy与数据库通信相关推荐

  1. python 笔记 之 sqlalchemy操作数据库-创建表

    2019独角兽企业重金招聘Python工程师标准>>> ''' pip install SQLAlchemy 操作数据库-创建表 ''' import sqlalchemy''' s ...

  2. python sql 日期查询_Python--flask使用 SQLAlchemy查询数据库最近时间段或之前的数据...

    Python--flask使用 SQLAlchemy查询数据库最近时间段或之前的数据 博客说明 文章所涉及的资料来自互联网整理和个人总结,意在于个人学习和经验汇总,如有什么地方侵权,请联系本人删除,谢 ...

  3. pymsql 与 SQLAlchemy 操作数据库的区别

    pymsql 与 SQLAlchemy 操作数据库的区别 pymsql 1.是获得连接对象 2.是获取连接对象 SQLAlchemy 1.是数据库连接配置 2.是通过配置创建引擎 3.是通过引擎创建数 ...

  4. SQLAlchemy 操作数据库

    SQLAlchemy 操作数据库 SQLAlchemy为Python提供了不同数据库的统一接口,采用ORM的方式操作数据库,简洁优雅 一.安装 直接通过pip安装即可 pip install sqla ...

  5. sqlalchemy 初始化数据库

    初始化数据库 方法1 from sqlalchemy import create_engine, MetaDataengine = create_engine('sqlite:///foo.db', ...

  6. AntDB数据库与DSG强强联手,助力通信行业核心系统国产化

    日前,湖南亚信安慧科技有限公司(简称:亚信安慧科技)与迪思杰(北京)数据管理技术有限公司开展了产品兼容互认工作. 图1:AntDB数据库与DSG SuperSync V3.2兼容互认证明 近年来,国家 ...

  7. SqlAlchemy Alembic数据库升级与降级简易教程

    前言 通常我们会将我们的代码放入到某个VCS(版本控制系统)中,进行可追溯的版本管理.一个项目除了代码,通常还会有一个数据库,这个数据库可能会随着项目的演进发生变化,甚至需要可以回滚到过去的某个状态, ...

  8. 自制预防校园暴力的智能监控系统:远程加载表格数据,SQLAlchemy操作数据库,云服务器(CentOS)

    本文是[小码哥李明杰老师]指导完成的山东大学引航计划公益人工智能科研实训项目. 自制预防校园暴力的智能监控系统 重定向路径 路由配置(访问根路径,直接重定向到main): export default ...

  9. 服务器怎么打开数据库文件夹,服务器怎么打开数据库文件夹下

    服务器怎么打开数据库文件夹下 内容精选 换一换 GaussDB(for MySQL)全兼容MySQL协议,因此,连接GaussDB(for MySQL)实例目前有两种方式:普通连接和SSL连接.其中, ...

  10. mysql数据库导出最大值_4.6 MySQL数据库导入与导出攻略

    4.6 MySQL数据库导入与导出攻略 4.6.1 Linux下MySQL数据库导入与导出 1. MySQL数据库的导出命令参数 主要是通过两个mysql和mysqldump命令来执行 (1) MyS ...

最新文章

  1. 还是贪心(结构体排序)
  2. 最新卡通渲染效果图(附带一张次世代帅哥)
  3. python 递归 写平方_Python算法:推导、递归和规约
  4. Nagios(一)——LAMP 环境搭建
  5. matlab如何判断一个文件夹里面是否包含某个含有部分文件名的文件_如何构建一个成功的AI PoC(概念验证项目)...
  6. 编译JAVA的错误: 编码ascii的不可映射字符
  7. 用微信公众号控制ESP8266的LED,进一步使用微信当遥控器
  8. python按顺序执行函数_Python3的unittest用例按编写顺序执行
  9. 在GEE平台提取Sentinel-1 SAR GRD的VV+VH波段
  10. 基于PaaS人事部门间平台多重身份的技术解决方案
  11. 西部狂徒自建服务器,在《西部狂徒》中如何快速建立自己根据地?杀人放火是上策...
  12. Excel 轻松制作 二级联动 下拉列表清单
  13. VUE2使用的JSON编辑器
  14. 欢聚时代YY/测试实习面试
  15. Java 字节码技术:不积细流,无以成江河
  16. 关于indexOf的全等匹配
  17. NLP-二分类的应用-区分外卖评论好评/差评
  18. flink-streaming-platform-web 源码解读
  19. 银行借ApplePay反攻 七方利益分润未解
  20. XV6源码解读:安装与编译

热门文章

  1. Linux 监控命令之 netstat
  2. php多文件上传存储到表,PHP 实现一种多文件上传的方法
  3. vue router传参_新手使用vue-router传参时注意事项
  4. viewsource和viewparsed_Network Panel说明
  5. 您的apple id 暂时不符合使用此应用程序_Mac相机不工作时该怎么办
  6. 【Python基础入门系列】第09天:Python tuple
  7. HALCON常用算子(HALCON13.0)
  8. VS2010中 C++创建DLL图解
  9. c语言结构体指针初始化
  10. Ubuntu16.04通过GPT挂载硬盘