之前的一篇:python︱mysql数据库连接——pyodbc


文章目录

  • 0 安装依赖
  • 1 连接方式
    • 1.1 pymysql.connect直接连
    • 1.2 pandas连接
    • 1.3 dbutils+PooledDB连接
  • 2 读/写/改
    • 2.1 常规查询
    • 2.2 常规-写入
    • 2.3 常规-批量写入
    • 2.4 常规-更新
    • 2.5 常规-删除
    • 2.6 pandas写回——to_sql
    • 2.6.0 sqlalchemy的格式
    • 2.7 pandas 读出——read_sql
    • 2.8 SQL + pandas 来创建表结构
    • 2.9 更新时间格式
    • 2.10 to_sql 和常规insert的优劣势
  • 3 其他基础设置
    • 3.1 更新注释
    • 3.2 批量修改字符串类型
    • 3.3 查看表名 + 列名
    • 3.4 指定唯一KEY
    • 3.5 left / right/inner Join 连接
  • 4 mysql文字查询
    • 4.1 通配符查询 like
    • 4.2 多字段模糊匹配:
    • 4.3 正则模糊匹配
    • 4.4 多个关键词匹配,并集关系(不是 | )
  • 5 报错类型
    • 5.1 报错1:ProgrammingError
    • 5.2 报错二:\xF0\x9F\x92\x9C\xF0\x9F 字符编码报错
    • 5.3 报错三: Incorrect string value
    • 5.4 报错四:not all arguments converted during string formatting
    • 5.5 报错五:DB-Lib error message 20018
    • 5.6 报错六:合并表的时候,编码报错
  • 6 一些笔者的自建函数
    • 6.1 打包查询函数
    • 6.2 DButils的使用
  • 7 一些应用
    • 7.1 时间创建与写入
    • 7.2 利用Pandas快速读入mysql / mmsql

0 安装依赖

pip3 install --pre pymysql -i https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install --pre sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple

1 连接方式

1.1 pymysql.connect直接连

import pymysql
# 连接databaseuser = "xxx"
password = "xxx"
host = 'xx.xx.xx.xx'
port = 3306conn = pymysql.connect(host=host, user=user,password=password,database="xxxxxxx")
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()

1.2 pandas连接

参考:利用pandas的to_sql将数据插入MySQL数据库和所踩过的坑

from sqlalchemy import create_engineengine = create_engine("mysql+pymysql://{}:{}@{}/{}?charset={}".format('用户名', '登录密码', '127.0.0.1:3306', '数据库名','字符编码'))
con = engine.connect()#创建连接
df.to_sql(name='rumousdata', con=con, if_exists='append', index=False)

1.3 dbutils+PooledDB连接

文章:python使用dbutils的PooledDB连接池,操作数据库

使用优势:

  • 1、使用dbutils的PooledDB连接池,操作数据库。
    这样就不需要每次执行sql后都关闭数据库连接,频繁的创建连接,消耗时间
  • 2、如果是使用一个连接一直不关闭,多线程下,插入超长字符串到数据库,运行一段时间后很容易出现OperationalError: (2006, ‘MySQL server has gone away’)这个错误。

使用PooledDB解决。

self.__class__.__pool = PooledDB(pymysql,mincached, maxcached,maxshared, maxconnections, blocking,maxusage, setsession, reset,host=host, port=port, db=db,user=user, passwd=passwd,charset=charset,cursorclass=pymysql.cursors.DictCursor)

2 读/写/改

python3.6 使用 pymysql 连接 Mysql 数据库及 简单的增删改查操作

2.1 常规查询

import pymysql  #导入 pymysql#打开数据库连接
db= pymysql.connect(host="localhost",user="root",password="123456",db="test",port=3307)# 使用cursor()方法获取操作游标
cur = db.cursor()#1.查询操作
# 编写sql 查询语句  user 对应我的表名
sql = "select * from user"
try:cur.execute(sql)    #执行sql语句results = cur.fetchall()   #获取查询的所有记录print("id","name","password")#遍历结果for row in results :id = row[0]name = row[1]password = row[2]print(id,name,password)
except Exception as e:raise e
finally:db.close()  #关闭连接

2.2 常规-写入

import pymysql
#2.插入操作
db= pymysql.connect(host="localhost",user="root",password="123456",db="test",port=3307)# 使用cursor()方法获取操作游标
cur = db.cursor()sql_insert ="""insert into user(id,username,password) values(4,'liu','1234')"""try:cur.execute(sql_insert)#提交db.commit()
except Exception as e:#错误回滚db.rollback()
finally:db.close()

2.3 常规-批量写入

# 单条插入
coin_cnts_2 = ['46213000','14000','1952','249','62000','10000','48','6446','121000']for n,cot in tqdm(enumerate(coin_cnts_2),total = len(coin_cnts_2)):sql = 'UPDATE douyin_data_1 SET coin_cnt = {} WHERE id = {} '.format(cot,n+1)mq.execute(sql)    # 批量插入
conn = pymysql.connect(host=mq.host, user=mq.user,password=mq.password,database=mq.database)
cursor = conn.cursor()
sql = 'UPDATE douyin_data_1 SET coin_cnt = %s WHERE id = %s '
data_info = [(cot,n) for n,cot in enumerate(coin_cnts_2)]
%time cursor.executemany(sql, data_info)

插入的速度比较慢

2.4 常规-更新

import pymysql
#3.更新操作
db= pymysql.connect(host="localhost",user="root",password="123456",db="test",port=3307)# 使用cursor()方法获取操作游标
cur = db.cursor()sql_update ="update user set username = '%s' where id = %d"try:cur.execute(sql_update % ("xiongda",3))  #像sql语句传递参数#提交db.commit()
except Exception as e:#错误回滚db.rollback()
finally:db.close()

2.5 常规-删除

import pymysql
#4.删除操作
db= pymysql.connect(host="localhost",user="root",password="123456",db="test",port=3307)# 使用cursor()方法获取操作游标
cur = db.cursor()sql_delete ="delete from user where id = %d"try:cur.execute(sql_delete % (3))  #像sql语句传递参数#提交db.commit()
except Exception as e:#错误回滚db.rollback()
finally:db.close()

2.6 pandas写回——to_sql

pd.to_sql()知道这些就够用了

DataFrame.to_sql(name, con, schema=None,
if_exists='fail', index=True, index_label=None,
chunksize=None, dtype=None, method=None)

参见pandas.to_sql函数,主要有以下几个参数:

  • name: 输出的表名
  • con: 与read_sql中相同,数据库链接
  • if_exits: 三个模式:fail,若表存在,则不输出;replace:若表存在,覆盖原来表里的数据;append:若表存在,将数据写到原表的后面。默认为fail
  • index:是否将df的index单独写到一列中
  • index_label:指定列作为df的index输出,此时index为True
  • chunksize: 同read_sql
  • dtype: 指定列的输出到数据库中的数据类型。字典形式储存:{column_name: sql_dtype}。常见的数据类型有sqlalchemy.types.INTEGER(), sqlalchemy.types.NVARCHAR(),sqlalchemy.Datetime()等,具体数据类型可以参考这里

使用to_sql的方式写回数据库之中。

import pandas as pd
from sqlalchemy import create_engine
yconnect = create_engine('mysql+pymysql://user:password@111.111.111.111:3306/your_database_name?charset=utf8')
pd.io.sql.to_sql(dataframe_name,'form_name', yconnect, schema='your_database_name', if_exists='append')

将数据写入mysql的数据库,但需要先通过sqlalchemy.create_engine建立连接,且字符编码设置为utf8,否则有些latin字符不能处理

  • 第二个参数tablename,form_name,是将导入的数据库中的表名
  • 第四个参数your_database_name是将导入的数据库名字
  • if_exists='append’的意思是,如果表tablename存在,则将数据添加到这个表的后面
    • fail的意思如果表存在,啥也不做
    • replace的意思,如果表存在,删了表,再建立一个新表,把数据插入
    • append的意思,如果表存在,把数据插入,如果表不存在创建一个表!!

字段字符格式的初始化:

df.to_sql(name='table', con=con, if_exists='append', index=False,dtype={'col1':sqlalchemy.types.INTEGER(),'col2':sqlalchemy.types.NVARCHAR(length=255),'col_time':sqlalchemy.DateTime(),'col_bool':sqlalchemy.types.Boolean})


如果不提供dtype,to_sql会自动根据df列的dtype选择默认的数据类型输出,比如字符型会以sqlalchemy.types.TEXT类型输出,相比NVARCHAR,TEXT类型的数据所占的空间更大,所以一般会指定输出为NVARCHAR;
而如果df的列的类型为np.int64时,将会导致无法识别并转换成INTEGER型,需要事先转换成int类型(用map,apply函数可以方便的转换)。

2.6.0 sqlalchemy的格式

自建格式的一些格式要求:

df.to_sql('emp_backup', engine, if_exists='replace', index=False,dtype={'EMP_ID': sqlalchemy.types.BigInteger(),'GENDER': sqlalchemy.types.String(length=20),'AGE': sqlalchemy.types.BigInteger(),'EMAIL':  sqlalchemy.types.String(length=50),'PHONE_NR':  sqlalchemy.types.String(length=50),'EDUCATION':  sqlalchemy.types.String(length=50),'MARITAL_STAT':  sqlalchemy.types.String(length=50),'NR_OF_CHILDREN': sqlalchemy.types.BigInteger()})

其他包括:

from sqlalchemy import exc
from sqlalchemy import schema as sa_schema
from sqlalchemy import types as sqltypes
from sqlalchemy import util
from sqlalchemy.engine import default
from sqlalchemy.engine import reflection
from sqlalchemy.sql import compiler
from sqlalchemy.sql import text
from sqlalchemy.types import BIGINT
from sqlalchemy.types import BINARY
from sqlalchemy.types import CHAR
from sqlalchemy.types import DATE
from sqlalchemy.types import DATETIME
from sqlalchemy.types import DECIMAL
from sqlalchemy.types import FLOAT
from sqlalchemy.types import INT  # noqa
from sqlalchemy.types import INTEGER
from sqlalchemy.types import NCHAR
from sqlalchemy.types import NUMERIC
from sqlalchemy.types import NVARCHAR
from sqlalchemy.types import REAL
from sqlalchemy.types import SMALLINT
from sqlalchemy.types import TEXT
from sqlalchemy.types import TIME
from sqlalchemy.types import TIMESTAMP
from sqlalchemy.types import Unicode
from sqlalchemy.types import VARBINARY
from sqlalchemy.types import VARCHAR

2.7 pandas 读出——read_sql

参考:https://blog.csdn.net/ydyang1126/article/details/78222125

# -*- coding: utf-8 -*-
import pandas as pd
import pymysqlconn = pymysql.connect(host='127.0.0.1', \user='root',password='123456', \db='TESTDB',charset='utf8', \use_unicode=True)sql = 'select GroupName from group limit 20'
df = pd.read_sql(sql, con=conn)
print(df.head())df.to_csv("data.csv")
conn.close()

2.8 SQL + pandas 来创建表结构

如果数据源本身是来自数据库,通过脚本操作是比较方便的。如果数据源是来自 CSV 之类的文本文件,可以手写 SQL 语句或者利用 pandas get_schema() 方法,如下例:

import sqlalchemyprint(pd.io.sql.get_schema(df, 'emp_backup', keys='EMP_ID', dtype={'EMP_ID': sqlalchemy.types.BigInteger(),'GENDER': sqlalchemy.types.String(length=20),'AGE': sqlalchemy.types.BigInteger(),'EMAIL':  sqlalchemy.types.String(length=50),'PHONE_NR':  sqlalchemy.types.String(length=50),'EDUCATION':  sqlalchemy.types.String(length=50),'MARITAL_STAT':  sqlalchemy.types.String(length=50),'NR_OF_CHILDREN': sqlalchemy.types.BigInteger()}, con=engine))

get_schema()并不是一个公开的方法,没有文档可以查看。生成的 SQL 语句如下:

CREATE TABLE emp_backup (`EMP_ID` BIGINT NOT NULL AUTO_INCREMENT,`GENDER` VARCHAR(20),`AGE` BIGINT,`EMAIL` VARCHAR(50),`PHONE_NR` VARCHAR(50),`EDUCATION` VARCHAR(50),`MARITAL_STAT` VARCHAR(50),`NR_OF_CHILDREN` BIGINT,CONSTRAINT emp_pk PRIMARY KEY (`EMP_ID`)
)

当然,如果数据源本身就是 mysql,当然不用大费周章来创建数据表的结构,直接使用 create table like xxx 就行。以下代码展示了这种用法:

import pandas as pd
from sqlalchemy import create_engineengine = create_engine('mysql+pymysql://user:password@localhost/stonetest?charset=utf8')
df = pd.read_sql('emp_master', engine)# Copy table structure
with engine.connect() as con:con.execute('DROP TABLE if exists emp_backup')con.execute('CREATE TABLE emp_backup LIKE emp_master;')df.to_sql('emp_backup', engine, index=False, if_exists='append')

2.9 更新时间格式

update djk_corpus set create_time='2019-09-29 14:09:45'

2.10 to_sql 和常规insert的优劣势

python的to_sql那点儿事

to_sql结论

  • 可以对齐字段(dataframe的columns和数据库字段一一对齐)
  • 可以缺少字段(dataframe的columns可以比数据库字段少)
  • 不可以多出字段,会报错
    -if_exists='append’进行新增(bug:如果设置了PK,ignore 和 replace会报错)
  • 一定要先创建好数据库,设置好格式,
  • 否则使用if_exists='append’自动创建的字段格式乱七八糟
  • 不能replace!!!!

to_sql代码

#构建数据库连接
engine=create_engine(f'mysql+pymysql://{user}:{passwd}@{host}:3306/{db}')#可以对齐字段,以及缺少字段;不可以增加字段
data.to_sql(sql_name,engine,index=False,if_exists='append')

自定义w_sql (迭代后版本)

# 定义写入数据库函数
def w_sql(sql_name,data,db_name,host=host,user=user,passwd=passwd):zd=""for j in data.columns:zd=zd+j+","connent = pymysql.connect(host=host, user=user, passwd=passwd, db=db_name, charset='utf8mb4') #连接数据库 cursor = connent.cursor()#创建游标for i in data.values:va=""for j in i:if pd.isnull(j):va=va+","+'null' #缺失值判断和转换else:va=va+","+'"'+str(j)+'"'
#         sql=u"""insert ignore into %s (%s) values(%s)"""%(sql_name,zd[:-1],va[1:])sql=u"""replace into %s (%s) values(%s)"""%(sql_name,zd[:-1],va[1:])cursor.execute(sql)connent.commit() #提交事务cursor.close()#关闭游标connent.close()#断开连接

3 其他基础设置

3.1 更新注释

sql = "show full columns from douyin_data_1; "
mq.query(sql)

3.2 批量修改字符串类型

ALTER TABLE `数据集` CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

3.3 查看表名 + 列名

SELECT column_name FROM information_schema.columns
WHERE table_name='xxxx表名';

某个数据库下有多少张表

import pymysql
# 连接databaseuser = "xxx"
password = "xxx"
host = '1.1.1.1'
port = 3306conn = pymysql.connect(host=host, user=user,password=password,database="xxx")
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()database  = 'd-lab'
sql = "SELECT table_name FROM information_schema.tables WHERE table_schema='{}' AND table_type='base table';".format(database)
cursor.execute(sql)    # 执行sql语句
cursor.fetchall()    # 获取查询的所有记录

不同表的列名:

'show columns from {}; '.format(form_name)

3.4 指定唯一KEY

import pandas as pd
from sqlalchemy import create_engine
import sqlalchemyengine = create_engine('mysql+pymysql://user:password@localhost/stonetest?charset=utf8')
df = pd.read_sql('emp_master', engine)
# make sure emp_master_backup table has been created
# so the table schema is what we want
df.to_sql('emp_backup', engine, index=False, if_exists='append')

也可以在 to_sql() 方法中,通过 dtype 参数指定字段的类型,然后在 mysql 中 通过 alter table 命令将字段 EMP_ID 变成 primary key。

df.to_sql('emp_backup', engine, if_exists='replace', index=False,dtype={'EMP_ID': sqlalchemy.types.BigInteger(),'GENDER': sqlalchemy.types.String(length=20),'AGE': sqlalchemy.types.BigInteger(),'EMAIL':  sqlalchemy.types.String(length=50),'PHONE_NR':  sqlalchemy.types.String(length=50),'EDUCATION':  sqlalchemy.types.String(length=50),'MARITAL_STAT':  sqlalchemy.types.String(length=50),'NR_OF_CHILDREN': sqlalchemy.types.BigInteger()})with engine.connect() as con:con.execute('ALTER TABLE emp_backup ADD PRIMARY KEY (`EMP_ID`);')

3.5 left / right/inner Join 连接

其中包括:

  • left join(左联接) 返回包括左表中的所有记录和右表中联结字段相等的记录
  • right join(右联接) 返回包括右表中的所有记录和左表中联结字段相等的记录
  • inner join(等值连接) 只返回两个表中联结字段相等的行
select * from A
innerjoin B
on A.aID = B.bID

4 mysql文字查询

最简单的文字匹配

select * FROM xiaohongshu_article_3 WHERE content REGEXP "家居"

4.1 通配符查询 like

通配符查询:
MySql的like语句中的通配符:百分号、下划线和escape

FROM [user] WHERE u_name REGEXP ‘^三’;     # 以三开头的句子
FROM [user] WHERE u_name REGEXP ‘三$’;     # 以三结尾的句子

其中:
%:表示任意个或多个字符。可匹配任意类型和长度的字符。

select * from user where username like '%huxiao';
select * from user where username like 'huxiao%';
select * from user where username like '%huxiao%';
SELECT * FROM [user] WHERE u_name LIKE ‘%三%猫%’

其中,%huxiao,代表句尾
‘%三%猫%’虽然能搜索出“三脚猫”,但不能搜索出符合条件的“张猫三”。

**_:表示任意单个字符。**匹配单个任意字符,它常用来限制表达式的字符长度语句:(可以代表一个中文字符)

select * from user where username like '_';
select * from user where username like 'huxia_';
select * from user where username like 'h_xiao';

如果我就真的要查%或者_,怎么办呢?使用escape,转义字符后面的%或_就不作为通配符了,注意前面没有转义字符的%和_仍然起通配符作用
Sql代码

select username from gg_user where username like '%xiao/_%' escape '/';
select username from gg_user where username like '%xiao/%%' escape '/';

4.2 多字段模糊匹配:

SELECT * FROM `magazine` WHERE CONCAT(`title`,`tag`,`description`) LIKE ‘%关键字%’

单字段多like模糊匹配:

SELECT * FROM [user] WHERE u_name LIKE '%三%' AND u_name LIKE '%猫%'

多字段 + 多like联合匹配:(参考:https://www.cnblogs.com/chywx/p/10154990.html)

select * from djk_corpus where concat(TestSubject,AnswerA,AnswerB,AnswerC,AnswerD,AnswerE) NOT REGEXP "png|jpg|jpeg|gif"

TestSubject,AnswerA等这几个字段,都不包含,png,jpg等,这些字段
匹配 p1 或 p2 或 p3。例如,‘z|food’ 能匹配 “z” 或 “food”。’(z|f)ood’ 则匹配 “zood” 或 “food”。
但是不能写成’p1&p2’,只能用"|"来写

还可以使用其他,但是需要注意顺序关系:
*(星号)和+(加号)都可以匹配多个该符号之前的字符。但是,+至少表示一个字符,而*可以表示0个字符。

SELECT * FROM baike369 WHERE name REGEXP 'a*c';
SELECT * FROM baike369 WHERE name REGEXP 'a+c';

其他方式:

select * from weibo_posts_21092h where
(
content like '%益生菌%'
OR content like '%益生元%'
OR content like '%微生态%'
OR content like '%后生元%'
OR content like '%微生物%'
)
AND
(
content like '%皮肤%'
OR content like '%肌肤%'
OR content like '%口腔%'
OR content like '%肠道%'
OR content like '%身体%'
OR content like '%人体%'
);

4.3 正则模糊匹配

来自:MySQL匹配指定字符串的查询

从baike369表的name字段中查询包含“a”到“w”字母和数字以外的字符的记录。SQL代码如下:

SELECT * FROM baike369 WHERE name REGEXP '[^a-w0-9]';

查看name字段中查询包含“a”到“w”字母和数字以外的字符的记录的操作效果。

使用方括号([])可以将需要查询的字符组成一个字符集;通过“[abc]”可以查询包含a、b和c等3个字母中任何一个的记录。

SELECT * FROM baike369 WHERE name REGEXP '[ceo]';

name字段中查询出包含数字的记录。

SELECT * FROM baike369 WHERE name REGEXP '[0-9]';

查询包含数字或者字母a、b和c的记录

SELECT * FROM baike369 WHERE name REGEXP '[0-9a-c]';

4.4 多个关键词匹配,并集关系(不是 | )

可行的是下面的这种:

data[data['col'].str.contains('(?=.*组合)(?=.*衣物)^.*$' , regex=True )]****

不可行的有:

data[data['col'].str.contains('组合(.*?)衣物' , regex=True )]
data[data['col'].str.contains('组合*衣物' , regex=True )]
data[data['col'].str.contains('组合+衣物' , regex=True )]
data[data['col'].str.contains('.*[(组合)(衣物)].*' , regex=True )]

5 报错类型

5.1 报错1:ProgrammingError

ProgrammingError: (1064, 'You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near \'\\\\"]", "1", "", "", "", "", "", "", "", "", "", "水", "7732-18-5", "WATER\' at line 2')

有人说是因为转义问题,所以:
可以使用:pymysql.escape_string(cdv)对文字进行一定编码(有点像,as.numbic() )

import pymysql
pymysql.escape_string('大花田菁(SESBANIAGRANDIFLORA)花')

5.2 报错二:\xF0\x9F\x92\x9C\xF0\x9F 字符编码报错

(1366, "Incorrect string value: '\xF0\x9F\x92\x9C\xF0\x9F...' for column 'text' at row,\xF0\x9F\x92\x9C\xF0\x9F,

这种原因因为MySQL不识别有些字符的utf8编码(如表情字符),这时你需要指定连接字符编码为utf8mb4。数据表对应字段编码也改成utf8mb4

5.3 报错三: Incorrect string value

[ERR] 1366 - Incorrect string value: ‘\xE4\xB8\xAD\xE5\x86\xB6…’ for column ‘楼盘名称’ at row 1

5.4 报错四:not all arguments converted during string formatting

文章:python3 pandas to_sql填坑

Execution failed on sql ‘SELECT name FROM sqlite_master WHERE type=‘table’ AND name=?;’: not all arguments converted during string formatting
数据库链接不再使用pymysql,而改用sqlalchemy,con=engine 而不是con=db 官方文档

但是,如果按照如上写法,在python3.6(我的python版本)环境下会出现找不到mysqldb模块错误!
正确的写法如下,因为python3将mysqldb改为pymysql了!!!
mysql+pymysql://root:root@localhost:3306/tushare?charset=utf8

5.5 报错五:DB-Lib error message 20018

当我用pymmsql执行,

cursor.execute('SELECT * FROM persons WHERE salesrep=John ')

会报错:

DatabaseError: Execution failed on sql 'select * from fresh_ax_product where itemid = "d"': (207, b"Invalid column name 'd'.DB-Lib error message 20018, severity 16:\nGeneral SQL Server error: Check messages from the SQL Server\n")

那么这里用pymmsql 来进行一些where语句的时候,就需要一些特殊的写入方式:

cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')

参考:Python连接SQL Server数据库 - pymssql使用基础

5.6 报错六:合并表的时候,编码报错

Cannot resolve the collation conflict between "Chinese_PRC_CI_AS" and "Chinese_PRC_Stroke_90_CI_AS_WS" in the equal to operation. (468)

解决:

select a1.*,a2.* from
(select *,(left(txno,4)) ShopNumber from xxx1) a1
left join xxx2 a2
on a1.fet1 = a2.fet2COLLATE Chinese_PRC_CI_AS

6 一些笔者的自建函数

6.1 打包查询函数

import pymysqlclass mysql_query():def __init__(self,user = "root",password = "123456",\host = '47.103.114.113',port = 3306,\database="d-lab"):self.user = userself.password = passwordself.host = hostself.port = portself.database = databasedef query(self,sql):conn = pymysql.connect(host=self.host, user=self.user,password=self.password,database=self.database)# 得到一个可以执行SQL语句的光标对象cursor = conn.cursor()try:#sql = "SELECT * FROM xiaohongshu_article_2 limit 1000"cursor.execute(sql)    # 执行sql语句query_out = cursor.fetchall()    # 获取查询的所有记录except Exception as e:print(e)finally:conn.close()return query_outdef execute(self,sql):conn = pymysql.connect(host=self.host, user=self.user,password=self.password,database=self.database)# 得到一个可以执行SQL语句的光标对象cursor = conn.cursor()try:#sql = "SELECT * FROM xiaohongshu_article_2 limit 1000"cursor.execute(sql)    # 执行sql语句conn.commit() #提交到数据库执行except Exception as e:print(e)conn.rollback() #发生错误后回滚finally:conn.close()return 'success'mq = mysql_query(user = "xxx",password = "xxx",\host = 'xxx',port = 3306,\database="xxx")sql = "select count(*) from data "
mq.query(sql)
  • query,针对查
  • execute,专门针对更新

6.2 DButils的使用

python使用dbutils的PooledDB连接池,操作数据库

"""
使用DBUtils数据库连接池中的连接,操作数据库
OperationalError: (2006, ‘MySQL server has gone away’)
"""
import json
import pymysql
import datetime
from DBUtils.PooledDB import PooledDB
import pymysqlclass MysqlClient(object):__pool = None;def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,maxusage=100, setsession=None, reset=True,host='127.0.0.1', port=3306, db='test',user='root', passwd='123456', charset='utf8mb4'):""":param mincached:连接池中空闲连接的初始数量:param maxcached:连接池中空闲连接的最大数量:param maxshared:共享连接的最大数量:param maxconnections:创建连接池的最大数量:param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理:param maxusage:单个连接的最大重复使用次数:param setsession:optional list of SQL commands that may serve to preparethe session, e.g. ["set datestyle to ...", "set time zone ..."]:param reset:how connections should be reset when returned to the pool(False or None to rollback transcations started with begin(),True to always issue a rollback for safety's sake):param host:数据库ip地址:param port:数据库端口:param db:库名:param user:用户名:param passwd:密码:param charset:字符编码"""if not self.__pool:self.__class__.__pool = PooledDB(pymysql,mincached, maxcached,maxshared, maxconnections, blocking,maxusage, setsession, reset,host=host, port=port, db=db,user=user, passwd=passwd,charset=charset,cursorclass=pymysql.cursors.DictCursor)self._conn = Noneself._cursor = Noneself.__get_conn()def __get_conn(self):self._conn = self.__pool.connection();self._cursor = self._conn.cursor();def close(self):try:self._cursor.close()self._conn.close()except Exception as e:print edef __execute(self, sql, param=()):count = self._cursor.execute(sql, param)print countreturn count@staticmethoddef __dict_datetime_obj_to_str(result_dict):"""把字典里面的datatime对象转成字符串,使json转换不出错"""if result_dict:result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}result_dict.update(result_replace)return result_dictdef select_one(self, sql, param=()):"""查询单个结果"""count = self.__execute(sql, param)result = self._cursor.fetchone()""":type result:dict"""result = self.__dict_datetime_obj_to_str(result)return count, resultdef select_many(self, sql, param=()):"""查询多个结果:param sql: qsl语句:param param: sql参数:return: 结果数量和查询结果集"""count = self.__execute(sql, param)result = self._cursor.fetchall()""":type result:list"""[self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]return count, resultdef execute(self, sql, param=()):count = self.__execute(sql, param)return countdef begin(self):"""开启事务"""self._conn.autocommit(0)def end(self, option='commit'):"""结束事务"""if option == 'commit':self._conn.autocommit()else:self._conn.rollback()if __name__ == "__main__":mc = MysqlClient()sql1 = 'SELECT * FROM shiji  WHERE  id = 1'result1 = mc.select_one(sql1)print json.dumps(result1[1], ensure_ascii=False)sql2 = 'SELECT * FROM shiji  WHERE  id IN (%s,%s,%s)'param = (2, 3, 4)print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)

7 一些应用

7.1 时间创建与写入

利用to_sql导入数据

import pandas as pd
import datetime
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engineyconnect = create_engine('mysql+pymysql://user:password@111.111.111.111:3306/your_database_name?charset=utf8mb4')df = pd.DataFrame({'序号':[2],'id':[2],'name2':["user 10"],'time':[datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")]})
# 导入
df.to_sql(name='test_8', con=yconnect, if_exists='append', index=False,dtype = {'time':sqlalchemy.DateTime()})

导入的定义不同字段的数据格式
如果,表格里面该字段已经是时间格式了,那么就可以直接插入:

# sql语句:
table_name = 'test_8'
sql = "update {} set time='{}' where id= {}".format(table_name,datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),2)
out = mc._cursor.execute(sql,)>>> update test_8 set time='2020-12-15 09:39:05' where id= 2

反正几种py时间生成的方式,都是符合规定:

#第一种
update_time=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#第二种
update_time=time.strftime('%Y-%m-%d %X',time.localtime(time.time()))

7.2 利用Pandas快速读入mysql / mmsql

简单写了一个可以连接mysql / mmsql的小函数,通过Pandas直接调用

import pymssql
import pymysql
import pandas as pd
import numpy as np
from collections import Counterdef connect_db(host= '0.0.0.0' , user='xxx',\password= 'xxx', database = 'xxx'):conn = pymssql.connect(host , user , password, database)return conndef fetch_data_from_db(sql_query, host, user, password, database):"""Fetch data from SQL server"""con = connect_db(host , user , password, database)cursor = con.cursor()cursor.execute(sql_query)result = cursor.fetchall()col_result = cursor.descriptioncolumns = []for i in range(len(col_result)):columns.append(col_result[i][0])df = pd.DataFrame(list(result), columns = columns)con.close()return dfclass pandas_read():def __init__(self,sql_config,types = 'mmsql'):self.sql_config = sql_configif types == 'mmsql':self.conn = self.get_conn_mmsql(sql_config)if types == 'mysql':self.conn = self.get_conn_mysql(sql_config)def get_conn_mmsql(self,mmsql_config):conn = pymssql.connect(mmsql_config['host'] , mmsql_config['user'] ,\mmsql_config['password'], mmsql_config['db'])return conndef get_conn_mysql(self,mysql_config):conn = pymysql.connect(host=mysql_config['host'], \user=mysql_config['user'],password=mysql_config['password'], \db=mysql_config['db'],charset='utf8', \use_unicode=True)return conndef read_sql(self,sql):return pd.read_sql(sql, con=self.conn)if __name__ == "__main__":user = 'xxx'password = 'xxx'host = '0.0.0.0'database = 'xxx'# 查询sql_query = 'select top 5000  * from database'data = fetch_data_from_db(sql_query, host, user, password, database)# pandas 读入sql_config = {'user':user,'password':password,'host':host,'db':'xxxdb'}pr = pandas_read(sql_config,types = 'mmsql')# 查询sql = 'select * from database'data = pr.read_sql(sql)

pymysql ︱mysql的基本操作与dbutils+PooledDB使用相关推荐

  1. 解决新版DBUtils使用连接池from DBUtils.PooledDB import PooledDB报错

    解决方法 将下面的代码: from DBUtils.PooledDB import PooledDB import pymysqlpool = PooledDB(pymysql, 5, **{&quo ...

  2. 基于linux操作系统Mysql的基本操作(一)

    基于linux操作系统Mysql的基本操作(一) 背景介绍:mysql软连接的建立,ln –s /usr/local/mysql/bin/mysql  /usr/bin 1.本地登录,命令: #mys ...

  3. [JSP暑假实训] 三.MySQL数据库基本操作及Servlet网站连接显示数据库信息

    本系列文章是作者暑假给学生进行实训分享的笔记,主要介绍MyEclipse环境下JSP网站开发,包括JAVA基础.网页布局.数据库基础.Servlet.前端后台数据库交互.DAO等知识. 前一篇文章讲解 ...

  4. mysql数据库基本操作总结与归纳

    mysql数据库基本操作总结与归纳 登录命令 mysql -u 用户名 -p 密码 列如: [root@localhost ~]# mysql -u root -p [root@localhost ~ ...

  5. Linux如何指向mysql_linux的基本操作(mysql 的基本操作)

    Mysql 的基本操作 在前面两个章节中已经介绍过MySQL的安装了,但是光会安装还不够,还需要会一些基本的相关操作.当然了,关于MySQL的内容也是非常多的,只不过对于linux系统管理员来讲,一些 ...

  6. php基础系列:从用户登录处理程序学习mysql扩展基本操作

    用户注册和登录是网站开发最基本的功能模块之一,现在通过登录处理程序代码来学些下php对mysql的基本操作. 本身没有难点,主要是作为开发人员,应该能做到手写这些基本代码,算是自己加强记忆,同时希望能 ...

  7. 1Python全栈之路系列之MySQL数据库基本操作

    Python全栈之路系列之MySQL数据库基本操作 MySQL数据库介绍 MySQL是一种快速易用的关系型数据库管理系统(RDBMS),很多企业都在使用它来构建自己的数据库. MySQL由一家瑞典公司 ...

  8. python3数据库框架_Python3 MySQL 数据库连接:安装pymysql(mysql数据库驱动), sqlalchemy(ORM框架)。...

    Python3 MySQL 数据库连接 python3使用mysql作为数据库,安装pymysql作为驱动,然后安装sqlalchemy框架 PyMySQL 驱动 安装: $ python3 -m p ...

  9. MySQL数据库——基本操作

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言 一.数据库基本操作 (一).登陆数据库 1.交互登陆 mysql -u root -p 2.免交互登陆 mysql - ...

最新文章

  1. 怎么用python画个电脑_python语言还是java如何用python画爱心
  2. Activity中加载器的总结
  3. 数组实例的find() 和 findIndex()方法
  4. 分布式通信协议RPC协议简介
  5. 网约代收垃圾App火了!别笑,垃圾分类下一个就到你了
  6. 如何安装mysql5.7.15_ubuntu16.04安装mysql5.7.15
  7. php编程之如何调用支付宝支付接口的实现
  8. Mac平台的MySQL管理工具
  9. 日本家用电器技术标准及IEC对照介绍
  10. matlab标准数据,Matlab数据标准化实现
  11. 传智播客黑马Java学习笔记_day07
  12. mysql能上传程序吗_利用mysql上传和执行文件
  13. 职称论文发表时怎么选择期刊
  14. ajax中返回sucess里使用this.$message()
  15. wince之WiFi漫游的工作原理
  16. macOS 安装 Adobe Zii 2019 for Adobe
  17. 软件测试方法和技术第一章——引论
  18. matlab-基础 复数 实部、虚部、模、共轭、辐角
  19. python 给QQ好友发信息
  20. 怎么让在线视频播放html,HTML5网页视频强制变速倍速播放

热门文章

  1. 微信小程序登录 code 40029 天坑
  2. 2018-04-26java实习面试记录
  3. 文件磁盘相关函数[2]-建立新文件 FileCreate
  4. 选择器高级、样式及布局
  5. nginx 常用优化
  6. ES6系列_2之新的声明方式
  7. linux下部署node+vue文件
  8. XRDP与VNC的关系
  9. 一副对联,送给所有创业小公司
  10. Kickstart +ftp+dhcp+tftp实现Linux系统的无人值守安装