现状

在工作中难免会使用数据库,为了能够高效并发访问数据库,数据库连接池必不可少,由于本站copy模式盛行,导致数据库连接池被错误使用,遇到错误甚至追求能跑通就行。

本文就python版本的数据库链接池模块在实际使用场景样例,来说明如何正确合理的使用数据库连接池。

业务场景

在部署机器学习模型时采用的是flask框架,模型预测本身是一个很快的事情,无奈有太多的特征需要通过接口(或者是ots,mysql等)获取,导致响应时效性降低。

为了能很好的实现并发性,提升QPS,采用gunicorn进行多进程,异步处理方案。

此时单个进程只有一个数据库链接,就会导致异步执行的线程共用同一个连接,从而导致报错,引入数据库连接池是必须的。

数据库连接池原理

通过预先建立链接,放到然后list中,使用的时候,从list中取出一个链接,使用使用完成后归还连接。当线程太多,链接池中没有链接的时候,可以选择block,等到有链接可用的时候返回,或者是直接返回错误。

dbutils已经实现了两种pooldb:

PooledDB :可以被多线程共享的链接,适用于异步场景,不断有新线程进来获取连接池,本文使用该方案。
PersistentDB:下面这句话表示,对线程的要求是持续稳定的,不能产生新的线程。Measures are taken to make the database connections thread-affine.
This means the same thread always uses the same cached connection,
and no other thread will use it.  So even if the underlying DB-API module
is not thread-safe at the connection level this will be no problem here.For best performance, the application server should keep threads persistent.

dbutils结构如下

db结尾的是mysql等数据库专用。

pg结尾的是PostgreSQL专用。

如上交代完之后,相信你对数据库链接池有较为全面的认知了,好了具体实现代码如下:

主要代码框架逻辑:

1、初始化连接池

2、获取链接

3、查询数据库

4、close链接(返回给连接池,并不是真正的关闭连接池)

5、具体数据查询&解析逻辑根据业务修改,此处提供了sql_fetch_json函数,返回json格式数据。

6、test1为多线程测试,此处自己多运行体会查询结果。

# coding=utf-8
import randomimport threading
from dbutils.pooled_db import PooledDB
from dbutils.persistent_db import PersistentDBimport time
import pymysqlfrom configuration.config import system_logger, db_configclass MysqlHelper(object):def __init__(self, db_config):self.__pool = PooledDB(creator=pymysql,mincached=1,maxcached=5,maxshared=5,maxconnections=5,maxusage=5,blocking=True,user=db_config.get('user'),passwd=db_config.get('password'),db=db_config.get('database'),host=db_config.get('host'),port=db_config.get('port'),charset=db_config.get('charset'),)def getConn(self):conn = self.__pool.connection()  # 从连接池获取一个链接cursor = conn.cursor()return conn, cursor@staticmethoddef dispose(cursor, conn):cursor.close()conn.close()def getOne(self, sql):conn, cursor = self.getConn()th_name = threading.currentThread().getName()# print(f'{th_name} {self.conn} {self.cursor} {time.time():.4f} start {sql}')cursor.execute(sql)rows = cursor.fetchall()print(f"{th_name} {conn} {cursor} {time.time():.4f} {rows}")# self.dispose()self.dispose(cursor, conn)return rowsdef queryOne(self, sql):system_logger.info("----------------------sql start ----------------------")system_logger.info(sql)try:conn, cursor = self.getConn()result = cursor.execute(sql)# rows = cursor.fetchall()json_data = self.sql_fetch_json(cursor)# 将连接返回self.dispose(cursor, conn)system_logger.info(f"-----------------------queryByKey result:{result} " + str(json_data))if len(json_data) == 1:return json_data[0]return Noneexcept Exception as e:system_logger.info("-----------predict exception line: " + str(e.__traceback__.tb_lineno) + " of " +e.__traceback__.tb_frame.f_globals["__file__"])system_logger.info(e)return None@staticmethoddef sql_fetch_json(cursor: pymysql.cursors.Cursor):""" Convert the pymysql SELECT result to json format """keys = []for column in cursor.description:keys.append(column[0])key_number = len(keys)json_data = []for row in cursor.fetchall():item = dict()for q in range(key_number):item[keys[q]] = row[q]json_data.append(item)return json_datadef test1(pool):phone_no = f"1390709000{random.randint(6,7)}"strsql = f"select * from zy_phone where policy_holder_phone_no={phone_no} order by insure_date " \+ "desc, kafka_etl_time asc limit 1 "while True:time.sleep(1)pool.getOne(strsql)# time.sleep(0.001)j = 0th_name = threading.currentThread().getName()# if th_name in ['Thread-2','Thread-5']:#     # print(f"task {th_name}")#     time.sleep(0.003)def main(pool):# pool.getConn()ths = []for i in range(5):th = threading.Thread(target=test1, args=(pool,))ths.append(th)for th in ths:th.start()for th in ths:th.join()if __name__ == "__main__":mysqlhelper = MysqlHelper(db_config)main(mysqlhelper)time.sleep(3)while True:time.sleep(1)

常见错误使用方法1:

 def getConn(self):self.conn = self.__pool.connection()self.cursor = self.conn.cursor()
此处不应该共享链接,和cursor,会导致报错:

AttributeError: 'NoneType' object has no attribute 'read'

或者:

AttributeError: 'NoneType' object has no attribute ‘settimeout‘

常见错误使用方法2:

获取链接以及查询的时候加锁

lock.acquire()
pool.getConn()
pool.getOne(strsql)
lock.release()
time.sleep(1)

因为pooldb本身就会加锁,参见如下源码中,自己在从链接池获取链接,到cursor获取数据的时候加锁,会导致锁冗余,此时连接池会退化成单个数据库链接。

self.__pool.connection() 逻辑如下:

    def connection(self, shareable=True):"""Get a steady, cached DB-API 2 connection from the pool.If shareable is set and the underlying DB-API 2 allows it,then the connection may be shared with other threads."""if shareable and self._maxshared:with self._lock:while (not self._shared_cache and self._maxconnectionsand self._connections >= self._maxconnections):self._wait_lock()if len(self._shared_cache) < self._maxshared:# shared cache is not full, get a dedicated connectiontry:  # first try to get it from the idle cachecon = self._idle_cache.pop(0)except IndexError:  # else get a fresh connectioncon = self.steady_connection()else:con._ping_check()  # check this connectioncon = SharedDBConnection(con)self._connections += 1else:  # shared cache full or no more connections allowedself._shared_cache.sort()  # least shared connection firstcon = self._shared_cache.pop(0)  # get itwhile con.con._transaction:# do not share connections which are in a transactionself._shared_cache.insert(0, con)self._wait_lock()self._shared_cache.sort()con = self._shared_cache.pop(0)con.con._ping_check()  # check the underlying connectioncon.share()  # increase share of this connection# put the connection (back) into the shared cacheself._shared_cache.append(con)self._lock.notify()con = PooledSharedDBConnection(self, con)else:  # try to get a dedicated connectionwith self._lock:while (self._maxconnectionsand self._connections >= self._maxconnections):self._wait_lock()# connection limit not reached, get a dedicated connectiontry:  # first try to get it from the idle cachecon = self._idle_cache.pop(0)except IndexError:  # else get a fresh connectioncon = self.steady_connection()else:con._ping_check()  # check connectioncon = PooledDedicatedDBConnection(self, con)self._connections += 1return con

到此本文结束,如果觉得有收获,就点个赞吧。

python多线程并发访问数据库连接池原理以及代码相关推荐

  1. Golang 侧数据库连接池原理和参数调优

    Golang 侧数据库连接池原理和参数调优 文章目录 Golang 侧数据库连接池原理和参数调优 数据库连接池 数据库连接池的设计 Go 的数据库连接池 Go 数据库连接池的设计 建立连接 释放连接 ...

  2. Java并发编程(03):多线程并发访问,同步控制

    本文源码:GitHub·点这里 || GitEE·点这里 一.并发问题 多线程学习的时候,要面对的第一个复杂问题就是,并发模式下变量的访问,如果不理清楚内在流程和原因,经常会出现这样一个问题:线程处理 ...

  3. 数据库连接池原理以及好处

    本篇内容综合广大网友提供内容,笔者经过整理,对数据库连接池原理和实现过程做个很系统的并且通俗易懂的分析讲解,以及手写一个连接池实现过程作为演示. 一.早期通过JDBC方式操作数据库 我们先来看早期使用 ...

  4. 数据库连接池原理和使用

    数据库连接池负责分配.管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库连接,而不是再重新建立一个:释放空闲时间超过最大空闲时间的数据库连接来避免因为没有释放数据库连接而引起的数据库连接遗漏 ...

  5. java多线程并发及线程池

    线程的常用创建方式 1.继承Thread类创建线程类 public class FirstThreadTest extends Thread {public void run(){System.out ...

  6. 4 多线程应用:数据库连接池

    4 多线程应用:数据库连接池 分类: 读书笔记2009-11-09 15:46 747人阅读 评论(0) 收藏 举报 数据库连接池多线程object数据库exceptionstring 首先说明一下: ...

  7. java jdbc close原理_JDBC数据库连接池原理

    JDBC是java数据库连接的简称.它是一种用于实行SQL语句的Java API,可以为多种关系数据库提供统一访问,它由一组用java语言编写的类和接口组成.其相关的API都在java.sql.*包下 ...

  8. python多线程并发

    python多线程并发 遍历数据库,然后查询历史记录,然后分析 数据查询100ms,这时需要3分钟,加了并发处理后,需要1.2分钟 后来数据库加了索引,需要6秒就可以了, 总结:加索引能带来30倍的优 ...

  9. ios并发会造成什么问题_iOS Core data多线程并发访问的问题

    大家都知道Core data本身并不是一个并发安全的架构:不过针对多线程访问带来的问题,Apple给出了很多指导:同时很多第三方的开发者也贡献了很多解决方法.不过最近碰到的一个问题很奇怪,觉得有一定的 ...

最新文章

  1. 谷歌利用人工智能设计的芯片揭示了智能的本质
  2. 【DIY】一个名叫“故事鸡”的儿童玩具是如何用树莓派3B+练成的
  3. Teradata QA Tester 职位
  4. 转:求多边形的面积 算法几何
  5. 银行业务软件测试,银行业务软件系统测试研究
  6. 【编程开发】Python---列表
  7. 数字加密c语言程序_国外程序员整理的 C++ 资源大全
  8. 日历对象导哪个包_微信新表情瞬间炸裂,文物表情包永恒萌呆!
  9. idea多级目录不展开的问题
  10. php跳过一段html,PHP_一段能瞬间秒杀所有版本IE的简单HTML代码,许多人都非常讨厌Internet Explore - phpStudy...
  11. 共享可写节包含重定位_周末去哪?来云浮!来乡村美食(番薯)节!
  12. Oracle使用mybatis批量添加
  13. python调用通达信函数用户指标_通达信自定义指标调用
  14. 一种基于RABC的软件系统权限管理设计
  15. php实现 合唱队形(算法想清楚在动)
  16. js实现兼容的本地化存储方案
  17. 第三方支付风险控制研究 ——以支付宝为例
  18. 7.2版升7.5+php7,帝国CMS7.2版升级到7.5版的详细步骤方法(已测)
  19. 数据科学学习笔记8 --- 分类(有监督的学习)
  20. 全国2013年10月考试《行政组织理论》试题和答案

热门文章

  1. 关于APP唤醒的方法总结
  2. 哼出调调来(2015年3月英语总结)
  3. 训练集准确率很高,验证集准确率低问题
  4. 居安思危,思则有备,有备无患
  5. DeepDream图像生成教程
  6. OAuth 2.0 MAC Tokens
  7. 使用POI将office(doc/docx/ppt/pptx/xls/xlsx)文件转html格式(附带源码)
  8. sign (数学符号函数)
  9. ggplot2设置坐标轴范围_R语言画展ggplot2篇:如何改变分类变量坐标轴的顺序?
  10. Origin——同时(指数形式)拟合三条曲线(参数共享的全局拟合)