python socketpool:通用连接池
简介
在软件开发中经常要管理各种“连接”资源,通常我们会使用对应的连接池来管理,比如mysql数据库连接可以用sqlalchemy中的池来管理,thrift连接可以通过thriftpool管理,redis-py中的StrictRedis实现本身就是基于连接池的,等等。 而今天介绍的socketpool是一个通用的python连接池库,通过它可以实现任意类型连接的管理,虽然不是很完美,但在一些找不到合适连接池实现、而又不想自己造轮子的时候使用起来会节省很多精力。
内部实现要点
- 这个类库的代码其实并不是特别的漂亮,但结构设计的不错,关键留下了对拓展开放的钩子,能让使用者根据自己的需要定制自己的连接池
- 内部主要的组件有ConnectionPool,Connector和backend_mod三个
- ConnectionPool实现了一个连接池的通用逻辑,用一个优先级队列管理所有连接,另外支持connection的生命周期定制,有一个reap机制(可选),基本思想是每个conn有一个最大生命周期,比如600秒,过了这个时间,就必须回收掉,reap线程(也有可能是greenlet或eventlet)定期检查过期的conn并进行回收
- Connector是一个接口,它可以看做是一个制造conn的工厂,ConnectionPool在需要新建conn的时候,会通过这个工厂来生成conn。所以我们只要实现Connector的接口方法就可以定制一个自己的连接工厂
- backend_mod是为了支持不同的线程模型(比如python原生线程,gevent或者eventlet)抽象出来的后端模块,它统一封装了Socket, PriorityQueue, Semaphore等和并发模型相关的组件,在创造ConnectionPool对象时可以通过参数控制选用哪种backend
部分代码阅读
ConnectionPool的初始化函数
def __init__(self, factory,retry_max=3, retry_delay=.1,timeout=-1, max_lifetime=600.,max_size=10, options=None,reap_connections=True, reap_delay=1,backend="thread"):if isinstance(backend, str):self.backend_mod = load_backend(backend)self.backend = backendelse:self.backend_mod = backendself.backend = str(getattr(backend, '__name__', backend))self.max_size = max_sizeself.pool = getattr(self.backend_mod, 'PriorityQueue')()self._free_conns = 0self.factory = factoryself.retry_max = retry_maxself.retry_delay = retry_delayself.timeout = timeoutself.max_lifetime = max_lifetimeif options is None:self.options = {"backend_mod": self.backend_mod,"pool": self}else:self.options = optionsself.options["backend_mod"] = self.backend_modself.options["pool"] = self# bounded semaphore to make self._alive 'safe'self._sem = self.backend_mod.Semaphore(1)self._reaper = Noneif reap_connections:self.reap_delay = reap_delayself.start_reaper()
这里几个参数的意义:
- factory是类对象,需要实现Connector接口,用来生成conn,options是调用factory时传入的参数
- retry_max是获取conn时如果出错最多重试几次
- max_lifetime是规定每个conn最大生命时间,见上面说的reap机制
- max_size是这个pool的大小上限
- backend是线程模型
- reap_connections控制是否启用reap机制
被启动的reap就是一个单独的线程,定时调用下面的方法把过期的conn回收掉:
def murder_connections(self):current_pool_size = self.pool.qsize()if current_pool_size > 0:for priority, candidate in self.pool:current_pool_size -= 1if not self.too_old(candidate):self.pool.put((priority, candidate))else:self._reap_connection(candidate)if current_pool_size <= 0:break
_reap_connection最终会回调conn对象的invalidate方法(Connector的接口)进行销毁。每次使用完conn后会调用release_connection, 它的逻辑是
def release_connection(self, conn):if self._reaper is not None:self._reaper.ensure_started()with self._sem:if self.pool.qsize() < self.max_size:connected = conn.is_connected()if connected and not self.too_old(conn):self.pool.put((conn.get_lifetime(), conn))else:self._reap_connection(conn)else:self._reap_connection(conn)
如果连接还没过期或断开,就会被重新放入优先级队列中,用户可以通过实现Connector接口的get_lifetime来控制这里放回的conn的优先级,priority最小的conn下次会被优先取出
Connector定义了哪些接口呢?
class Connector(object):def matches(self, **match_options):raise NotImplementedError()def is_connected(self):raise NotImplementedError()def handle_exception(self, exception):raise NotImplementedError()def get_lifetime(self):raise NotImplementedError()def invalidate(self):raise NotImplementedError()
matches方法主要用在pool取出一个conn时,除了优先选择priority最小的conn,还需要这个conn和get(**options)传入的参数match,这个match就是回调conn的matches方法。其他几个接口前面都涉及到了。
TcpConnector实现
来看一下socketpool自带的TcpConnector的实现,实现tcp socket的工厂
class TcpConnector(Connector):def __init__(self, host, port, backend_mod, pool=None):self._s = backend_mod.Socket(socket.AF_INET, socket.SOCK_STREAM)self._s.connect((host, port))self.host = hostself.port = portself.backend_mod = backend_modself._connected = True# use a 'jiggle' value to make sure there is some# randomization to expiry, to avoid many conns expiring very# closely together.self._life = time.time() - random.randint(0, 10)self._pool = pooldef __del__(self):self.release()def matches(self, **match_options):target_host = match_options.get('host')target_port = match_options.get('port')return target_host == self.host and target_port == self.portdef is_connected(self):if self._connected:return util.is_connected(self._s)return Falsedef handle_exception(self, exception):print('got an exception')print(str(exception))def get_lifetime(self):return self._lifedef invalidate(self):self._s.close()self._connected = Falseself._life = -1def release(self):if self._pool is not None:if self._connected:self._pool.release_connection(self)else:self._pool = Nonedef send(self, data):return self._s.send(data)def recv(self, size=1024):return self._s.recv(size)
不需要太多额外解释。
拓展实现HiveConnector
根据自身项目需要,我用pyhs2实现了一个hive连接池
class HiveConnector(Connector):def __init__(self, host, port, backend_mod, pool=None, authMechanism='NOSASL',**options):self.host = hostself.port = portself.backend_mod = backend_modself._pool = poolself._connected = Falseself._conn = pyhs2.connect(host=host,port=port,authMechanism=authMechanism)self._connected = True# use a 'jiggle' value to make sure there is some# randomization to expiry, to avoid many conns expiring very# closely together.self._life = time.time() - random.randint(0, 10)def __del__(self):self.release()def matches(self, **match_options):target_host = match_options.get('host')target_port = match_options.get('port')return target_host == self.host and target_port == self.portdef is_connected(self):return self._connecteddef handle_exception(self, exception):logger.exception("error: %s" % str(exception))def get_lifetime(self):return self._lifedef invalidate(self):try:self._conn.close()except:passfinally:self._connected = Falseself._life = -1def release(self):if self._pool is not None:if self._connected:self._pool.release_connection(self)else:self._pool = Nonedef cursor(self):return self._conn.cursor()def execute(self, hql):with self.curosr() as cur:return cur.execute(hql)hive_pool = ConnectionPool(factory=HiveConnector, **HIVE_CONNECTOR_CONFIG)
使用这个hive_pool去执行hql语句非常容易:
with hive_pool.connection() as conn:with conn.cursor() as cur:print cur.getDatabases()
总结
简绍了socketpool的内部实现,以及如何使用它构造自己的连接池。
转载于:https://www.cnblogs.com/quijote/p/4388900.html
python socketpool:通用连接池相关推荐
- mysql连接池_基于Swoole的通用连接池 - 数据库连接池(life)
open-smf/connection-pool 是一个基于Swoole的通用连接池,常被用作数据库连接池. 依赖 依赖版本PHP>=7.0.0Swoole>=4.2.9Recommend ...
- swoole mysql 连接池_基于Swoole的通用连接池 - 数据库连接池
连接池 open-smf/connection-pool 是一个基于Swoole的通用连接池,常被用作数据库连接池. 依赖 依赖 版本 >=7.0.0 >=4.2.9 Recommend ...
- python requests 异步调用_构建高效的python requests长连接池详解
前文: 最近在搞全网的CDN刷新系统,在性能调优时遇到了requests长连接的一个问题,以前关注过长连接太多造成浪费的问题,但因为系统都是分布式扩展的,针对这种各别问题就懒得改动了. 现在开发的缓存 ...
- python sqlserver api连接池_非常老的话题 SQLSERVER连接池
非常老的话题 SQLSERVER连接池 写这篇文章不是说要炒冷饭,因为园子里有非常非常多关于SQLSERVER连接池的文章,但是他们说的都是引用MSDN里的解释 或者自己做一些测试试验一下连接池的性能 ...
- python pymysql使用连接池连接mysql示例
不使用连接池: import pymysqlmysql_config = {"db": "test_db","host": "12 ...
- Golang 通用连接池
在使用之前我们需要先了解清楚连接池的概念,总结下来连接池主要解决以下几类问题: 减少连接创建时间 不论是与数据库还是Thrift等程序建立连接都是有开销的.如果这类连接是"循环"使 ...
- python连接池原理_python redis之连接池的原理
python redis之连接池的原理 什么是连接池 通常情况下, 当我们需要做redis操作时, 会创建一个连接, 并基于这个连接进行redis操作, 操作完成后, 释放连接, 一般情况下, 这是没 ...
- sybase jz0c0 连接已关闭_Go 基于 channel 实现连接池
golang的channel除了goroutine通信之外还有很多其他的功能,本文将实现一种基于channel的通用连接池. 何为通用? 连接池的实现不依赖具体的实例,而依赖某个接口,本文的连接池选用 ...
- 一篇搞懂TCP、HTTP、Socket、Socket连接池
上一篇:闲鱼面试官:Thread.sleep(0) 到底有什么用?我:有点懵~ 作者:数澜科技 链接:https://www.jianshu.com/p/e47a766e03da 前言:作为一名开发人 ...
最新文章
- php sql 条件拼组_ThinkPHP框架SQL操作链式写法原理(浅显易懂)
- 他不怕被拒绝_【保险知识】高情商保险营销,再也不怕被拒绝!
- 前端学习(1947)vue之电商管理系统电商系统之使用自定义模板渲染
- 手把手教你爬虫requests实战演练——python篇
- Java 堆内存是线程共享的!面试官:你确定吗?
- 人工神经网络之Python 实战
- IIS7.5应用程序池集成模式和经典模式的区别介绍
- ubuntu下鼠标右键新建文档
- C++ 原子操作与无锁编程
- 周志华:关于机器学习的一点思考
- 电脑本地连接,电脑出现多个“本地连接”的解决方法
- 解决端口占用问题 Port xxxx was already in use
- 关于防火墙DMZ区的使用和防火墙的DMZ区域规则的配置
- 数模论文排版—从第三页设置页码,页码格式为page X of Y
- 逆向某停车app(原创)
- celeste mod如何安装
- 大专学历走社招,两个部门,六轮面试,终与字节无缘
- esp32➡遥控篇➡turtlesim➡mobot➡turtlebot3
- stm32cubemx配置pwm
- 《Python编程 从入门到实践》 一、基础知识 第六章 字典
热门文章
- HttpClient 中文官方教程----第一章基础知识-只收录,未测试
- Puppet 4 性能提升超2倍,升级前应该你知悉的变化
- C语言 · 求矩阵各个元素的和
- Apache Spark 2.0预览: 机器学习模型持久化
- maven scala plugin 实现jvmArgs,执行过程原理解析笔记
- 转发与重定向(forward与redirect)
- ystep jQuery流程、步骤插件
- 利用GetPrivateProfileString读取配置文件(.ini)
- 摆线减速器原理、减速比、设计方法
- 划词翻译软件QTranslate 6.7.3 中文绿色版