Python:peewee常用操作CRUD
Defining models is similar to Django or SQLAlchemy
译文:定义模型类似于Django或SQLAlchemy
目录
- 1、数据库 Database
- 1.1、设置参数
- 1.2、连接数据库
- 1.3、执行原生sql
- 2、模型 Model
- 2.1、定义模型
- 2.2、表操作
- 3、模型的CURD操作
- 3.1、写入操作
- 3.2、更新数据
- 3.3、删除数据
- 3.4、取单条数据
- 3.5、取多条数据
- 4、排序分组统计
- 1、排序
- 4.2、分页
- 4.3、统计
- 4.4、分组
- 4.5、分组统计
文档
- github: https://github.com/coleifer/peewee
- 官方文档:http://docs.peewee-orm.com/
- pypi https://pypi.org/project/peewee/
示例代码仓库
https://github.com/mouday/peewee-demo
安装
pip install peewee
测试环境
$ python --version
Python 3.7.0$ pip show peewee
Name: peewee
Version: 3.15.3
1、数据库 Database
1.1、设置参数
# -*- coding: utf-8 -*-
"""
@File : database.py
"""from peewee import SqliteDatabase
import logging# 设置数据库
db = SqliteDatabase("demo.db")# 打印日志
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
logger.propagate = False # 不向上传播
1.2、连接数据库
from app.database import db# 链接数据库
db.connect()# 断开数据库
if not db.is_closed():db.close()
1.3、执行原生sql
获取多条记录
cursor = db.execute_sql("select * from tb_user where id = ?", (1,))
rows = cursor.fetchall()
print(rows)
[(1, 'Jack', 23, '2022-10-19 18:09:07.038935', '2022-10-19 18:09:07.038940')
]
获取单条记录
cursor = db.execute_sql("select * from tb_user where id = ?", (1,))# 将返回结果转换为dict
# https://docs.python.org/zh-cn/3.6/library/sqlite3.html#sqlite3.Connection.row_factory
def dict_factory(cursor, row):"""将返回结果转换为dict"""d = {}for idx, col in enumerate(cursor.description):d[col[0]] = row[idx]return dcursor.row_factory = dict_factory
row = cursor.fetchone()
print(row)
{'id': 1, 'name': 'Jack', 'age': 23, 'created_time': '2022-10-19 18:09:07.038935', 'update_time': '2022-10-19 18:09:07.038940'
}
2、模型 Model
2.1、定义模型
定义基类模型
# -*- coding: utf-8 -*-
"""
@File : base_model.py
"""from peewee import Modelfrom app.database import dbclass BaseModel(Model):"""# 基类,设置数据库链接"""class Meta:database = db
定义模型
# -*- coding: utf-8 -*-
"""
@File : user_model.py
"""from datetime import datetimefrom peewee import CharField, DateTimeField, IntegerField, AutoFieldfrom app.model.base_model import BaseModelclass UserModel(BaseModel):"""用户表"""id = AutoField()name = CharField(null=False)age = IntegerField(null=False)created_time = DateTimeField(default=datetime.now)update_time = DateTimeField(default=datetime.now)class Meta:# 指定表名table_name = 'tb_user'
2.2、表操作
建表
UserModel.create_table()
('CREATE TABLE IF NOT EXISTS "tb_user" ("id" INTEGER NOT NULL PRIMARY KEY, "name" VARCHAR(255) NOT NULL, "age" INTEGER NOT NULL, "created_time" DATETIME NOT NULL, "update_time" DATETIME NOT NULL)', []
)
查看表是否存在
UserModel.table_exists()
('SELECT name FROM "main".sqlite_master WHERE type=? ORDER BY name',('table',))
删除表
UserModel.drop_table()
('DROP TABLE IF EXISTS "tb_user"', []
)
3、模型的CURD操作
3.1、写入操作
插入数据
ret = UserModel.insert({UserModel.age: 20,UserModel.name: 'Tom'
}).execute()
'INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?)',
['Tom', 20, datetime.datetime(2022, 10, 19, 17, 28, 30, 198981), datetime.datetime(2022, 10, 19, 17, 28, 30, 198988)
]
插入字典数据
ret = UserModel.insert({'age': 20,'name': 'Tom'
}).execute()
'INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?)',
['Tom', 20, datetime.datetime(2022, 10, 19, 17, 28, 30, 198981), datetime.datetime(2022, 10, 19, 17, 28, 30, 198988)
]
保存实例
user = UserModel(age=21,name='Tom'
)user.save()
('INSERT INTO "tb_user" ("name", "age", "created_time", "update_time") VALUES (?, ?, ?, ?)',
['Charlie', 12,
datetime.datetime(2022, 10, 19, 17, 34, 43, 376650),
datetime.datetime(2022, 10, 19, 17, 34, 43, 376652)])
插入并创建实例
user = UserModel.create(age=22,name='Tom'
)
('INSERT INTO "tb_user" ("name", "age", "created_time", "update_time") VALUES (?, ?, ?, ?)', ['Charlie', 12, datetime.datetime(2022, 10, 19, 17, 36, 16, 408224), datetime.datetime(2022, 10, 19, 17, 36, 16, 408226)])
插入多条数据
UserModel.insert_many([{'age': 23,'name': 'Tom'},{'age': 24,'name': 'Tom'}
]).execute()
('INSERT INTO "tb_user" ("name", "age", "created_time", "update_time") VALUES (?, ?, ?, ?), (?, ?, ?, ?)', ['Tom', 23, datetime.datetime(2022, 10, 19, 17, 38, 48, 106336), datetime.datetime(2022, 10, 19, 17, 38, 48, 106344), 'Tom', 24, datetime.datetime(2022, 10, 19, 17, 38, 48, 106355), datetime.datetime(2022, 10, 19, 17, 38, 48, 106360)])
分块插入,忽略重复数据
from peewee import chunked# Insert rows 100 at a time.
with db.atomic():for batch in chunked(data_source, 100):MyModel.insert_many(batch).on_conflict_ignore().execute()
3.2、更新数据
更新多条数据
UserModel.update(name='Jack'
).where(UserModel.id == 1
).execute()
('UPDATE "tb_user" SET "name" = ? WHERE ("tb_user"."id" = ?)', ['Jack', 1])
更新单条数据
UserModel.set_by_id(1, {'name': 'Jack'})
('UPDATE "tb_user" SET "name" = ? WHERE ("tb_user"."id" = ?)', ['Jack', 1])
3.3、删除数据
按照主键删除
UserModel.delete_by_id(1)
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])
按条件删除
UserModel.delete().where(UserModel.id == 1
).execute()
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])
删除实例
user = UserModel.get_by_id(1)user.delete_instance()
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])
清空表数据
UserModel.truncate_table()
('DELETE FROM "tb_user"', [])
3.4、取单条数据
条件查询一条
row = UserModel.select().where(UserModel.name == 'Tom'
).get()print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?) LIMIT ? OFFSET ?', ['Tom', 1, 0])
获取第一条
row = UserModel.select().where(UserModel.name == 'Tom'
).first()print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?) LIMIT ?', ['Tom', 1])
通过获取,不存在报错
row = UserModel.get(UserModel.name == 'Tom')
print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?) LIMIT ? OFFSET ?', ['Tom', 1, 0])
通过获取或者返回None
user = UserModel.get_or_none(UserModel.name == 'Jack')
print(user)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?) LIMIT ? OFFSET ?', ['Jack', 1, 0])
通过主键获取,不存在报错
user = UserModel.get_by_id(1)print(user)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."id" = ?) LIMIT ? OFFSET ?', [1, 1, 0])
获取或创建
UserModel.get_or_create(name='Tom', age=23)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE (("t1"."name" = ?) AND ("t1"."age" = ?)) LIMIT ? OFFSET ?', ['Tom', 23, 1, 0])('BEGIN', None)('INSERT INTO "tb_user" ("name", "age", "created_time", "update_time") VALUES (?, ?, ?, ?)', ['Tom', 23, datetime.datetime(2022, 10, 19, 18, 9, 7, 38935), datetime.datetime(2022, 10, 19, 18, 9, 7, 38940)])
3.5、取多条数据
查询多条记录
# 注意,获取的是 iterator
# 可以转为 namedtuples(), tuples(), dicts()query = UserModel.select().where(UserModel.name == 'Tom'
)print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?)', ['Tom'])
4、排序分组统计
1、排序
query = UserModel.select().where(UserModel.name == 'Tom'
).order_by(UserModel.age.desc())print(list(query))
# [<UserModel: 1>]
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?) ORDER BY "t1"."age" DESC', ['Tom'])
4.2、分页
query = UserModel.select().paginate(2, 10)print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" LIMIT ? OFFSET ?', [10, 10])
4.3、统计
query = UserModel.select().count()
print(list(query))
('SELECT COUNT(1) FROM (SELECT 1 FROM "tb_user" AS "t1") AS "_wrapped"', [])
4.4、分组
query = UserModel.select().group_by(UserModel.name)print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" GROUP BY "t1"."name"', [])
4.5、分组统计
query = UserModel.select(UserModel.group_id,fn.COUNT().alias('count')).group_by(UserModel.group_id)print(list(query))
('SELECT "t1"."group_id", COUNT() AS "count" FROM "tb_user" AS "t1" GROUP BY "t1"."group_id"',
[])
Python:peewee常用操作CRUD相关推荐
- python列表常用操作函数_Python入门——列表常用操作
Python列表常用操作 准备 控制台输入ipython parallels@parallels-vm:~$ ipythonPython 2.7.12 (default, Dec 4 2017, 14 ...
- python numpy常用操作、Numpy 多维数组、矩阵相乘、矩阵乘以向量
python numpy常用操作 Numpy基本操作 # 导入numpy import numpy as np # 生成numpy数组 x = np.array([1.0, 2.0, 3.0]) pr ...
- python列表常用の操作
python列表常用の方法 列表操作: 1.创建列表: 向列表添加元素: [i for i in range(1,10)]列表解析式: #将 1-10 每个数乘以 2 放入一个列表: >> ...
- Python列表常用操作,浅拷贝及深拷贝
先看一些基本的操作 names = ["4ZhangYang", "#!Guyun","xXiangPeng",["alex&qu ...
- python 列表 常用操作 列表的循环遍历
目录 列表格式 常用操作 查找 判断是否存在 添加数据 删除数据 修改数据 复制 列表的循环遍历 列表嵌套 列表为可变类型数据 列表格式 [数据1,数据2,数据3,.......] 注:列表可以一次性 ...
- 真香!精心整理了 100+Python 字符串常用操作
来源丨萝卜大杂烩 作者丨周萝卜 字符串作为平时使用最多的数据类型,其常用的操作我们还是很有必要熟记于心的,本文整理了多种字符串的操作的案例,还是非常用心,记得点赞收藏~ 字符串切片操作 test = ...
- 精心整理了100+Python字符串常用操作,备用
字符串作为平时使用最多的数据类型,其常用的操作我们还是很有必要熟记于心的,本文整理了多种字符串的操作的案例,还是非常用心,记得点赞收藏哦 文章很长,高低要忍一下,如果忍不了,那就收藏吧,总会用到的 萝 ...
- 蓝桥杯软件类竞赛--Python的常用操作示例
算法教材:<算法竞赛入门到进阶> 清华大学出版社 网购:京东 当当 作者签名书(有发票):点我 有建议请加QQ 群:567554289 文章目录 1. for循环 2. sort()和 ...
- python 列表常用操作(二)
1.tuple 的 unpack a,b = t 2.格式化输出 print('您的输入:{},值为{}',format(a,b)) 3.日期计算 import datetime as dt impo ...
最新文章
- Java每日一讲讲什么好_撩课-Java每天10道面试题第1天
- 分布式一致性(共识)算法(Paxos,raft,ZAB)的一些总结
- android使用桢布局,Android性能优化UI篇
- pdf转换为word问题
- python默认参数举例_Python之在函数中使用列表作为默认参数
- Oracle迁移索引
- matlab 的cat函数
- React 新 Context API 在前端状态管理的实践
- 深入分析Ribbon源码分析
- matlab thetal,基於matlab的車道和車道線檢測樣例
- 问答| 为什么汽车会采用前轮转向,后轮驱动的方式?为什么反过来的搭配方式很少见?
- python封装数据库操作_Python3 数据库操作小封装
- MFC 对话框 添加 工具栏
- SWUST OJ 190: 游程编码
- MySQL懒人管理工具-NaVicat
- MIC灵敏度, MIC动态范围下限值估计, -3dB, dB加减
- muduo源码学习 Day03
- 神气的Android Studio -Pligins什么都没有
- PLC远程监控与数据采集方案(手机APP)
- 中医药学概论(复习详解)
热门文章
- php实现addon安装卸载,插件Addon文件
- 《数据结构》专题10--最短路
- www.us258.com 这家主机商相当无耻,用着不到一个月给我停了,连用户名都删了。...
- 2021年前端关注的8个技术趋势
- you-get和youtube-dl下载全网视频
- 最小攻击超过最大攻击_贪婪洞窟攻击力解析 最大攻击与最小攻击对比分析
- 挑逗新兴科技创业圈兴奋中枢,保险极客“百万医疗”搞事情
- Mysql查询各科成绩前三名并分别排序
- Java开发 微软OAuth身份验证在EWS中的应用
- 人工智能现在的技术“好玩”到了什么程度?