简介

在软件开发中经常要管理各种“连接”资源,通常我们会使用对应的连接池来管理,比如mysql数据库连接可以用sqlalchemy中的池来管理,thrift连接可以通过thriftpool管理,redis-py中的StrictRedis实现本身就是基于连接池的,等等。 而今天介绍的socketpool是一个通用的python连接池库,通过它可以实现任意类型连接的管理,虽然不是很完美,但在一些找不到合适连接池实现、而又不想自己造轮子的时候使用起来会节省很多精力。

内部实现要点

  • 这个类库的代码其实并不是特别的漂亮,但结构设计的不错,关键留下了对拓展开放的钩子,能让使用者根据自己的需要定制自己的连接池
  • 内部主要的组件有ConnectionPool,Connector和backend_mod三个
    • ConnectionPool实现了一个连接池的通用逻辑,用一个优先级队列管理所有连接,另外支持connection的生命周期定制,有一个reap机制(可选),基本思想是每个conn有一个最大生命周期,比如600秒,过了这个时间,就必须回收掉,reap线程(也有可能是greenlet或eventlet)定期检查过期的conn并进行回收
    • Connector是一个接口,它可以看做是一个制造conn的工厂,ConnectionPool在需要新建conn的时候,会通过这个工厂来生成conn。所以我们只要实现Connector的接口方法就可以定制一个自己的连接工厂
    • backend_mod是为了支持不同的线程模型(比如python原生线程,gevent或者eventlet)抽象出来的后端模块,它统一封装了Socket, PriorityQueue, Semaphore等和并发模型相关的组件,在创造ConnectionPool对象时可以通过参数控制选用哪种backend

部分代码阅读

ConnectionPool的初始化函数

     def __init__(self, factory,retry_max=3, retry_delay=.1,timeout=-1, max_lifetime=600.,max_size=10, options=None,reap_connections=True, reap_delay=1,backend="thread"):if isinstance(backend, str):self.backend_mod = load_backend(backend)self.backend = backendelse:self.backend_mod = backendself.backend = str(getattr(backend, '__name__', backend))self.max_size = max_sizeself.pool = getattr(self.backend_mod, 'PriorityQueue')()self._free_conns = 0self.factory = factoryself.retry_max = retry_maxself.retry_delay = retry_delayself.timeout = timeoutself.max_lifetime = max_lifetimeif options is None:self.options = {"backend_mod": self.backend_mod,"pool": self}else:self.options = optionsself.options["backend_mod"] = self.backend_modself.options["pool"] = self# bounded semaphore to make self._alive 'safe'self._sem = self.backend_mod.Semaphore(1)self._reaper = Noneif reap_connections:self.reap_delay = reap_delayself.start_reaper()

这里几个参数的意义:

  • factory是类对象,需要实现Connector接口,用来生成conn,options是调用factory时传入的参数
  • retry_max是获取conn时如果出错最多重试几次
  • max_lifetime是规定每个conn最大生命时间,见上面说的reap机制
  • max_size是这个pool的大小上限
  • backend是线程模型
  • reap_connections控制是否启用reap机制

被启动的reap就是一个单独的线程,定时调用下面的方法把过期的conn回收掉:

     def murder_connections(self):current_pool_size = self.pool.qsize()if current_pool_size > 0:for priority, candidate in self.pool:current_pool_size -= 1if not self.too_old(candidate):self.pool.put((priority, candidate))else:self._reap_connection(candidate)if current_pool_size <= 0:break

_reap_connection最终会回调conn对象的invalidate方法(Connector的接口)进行销毁。每次使用完conn后会调用release_connection, 它的逻辑是

     def release_connection(self, conn):if self._reaper is not None:self._reaper.ensure_started()with self._sem:if self.pool.qsize() < self.max_size:connected = conn.is_connected()if connected and not self.too_old(conn):self.pool.put((conn.get_lifetime(), conn))else:self._reap_connection(conn)else:self._reap_connection(conn)

如果连接还没过期或断开,就会被重新放入优先级队列中,用户可以通过实现Connector接口的get_lifetime来控制这里放回的conn的优先级,priority最小的conn下次会被优先取出

Connector定义了哪些接口呢?

 class Connector(object):def matches(self, **match_options):raise NotImplementedError()def is_connected(self):raise NotImplementedError()def handle_exception(self, exception):raise NotImplementedError()def get_lifetime(self):raise NotImplementedError()def invalidate(self):raise NotImplementedError()

matches方法主要用在pool取出一个conn时,除了优先选择priority最小的conn,还需要这个conn和get(**options)传入的参数match,这个match就是回调conn的matches方法。其他几个接口前面都涉及到了。

TcpConnector实现

来看一下socketpool自带的TcpConnector的实现,实现tcp socket的工厂

 class TcpConnector(Connector):def __init__(self, host, port, backend_mod, pool=None):self._s = backend_mod.Socket(socket.AF_INET, socket.SOCK_STREAM)self._s.connect((host, port))self.host = hostself.port = portself.backend_mod = backend_modself._connected = True# use a 'jiggle' value to make sure there is some# randomization to expiry, to avoid many conns expiring very# closely together.self._life = time.time() - random.randint(0, 10)self._pool = pooldef __del__(self):self.release()def matches(self, **match_options):target_host = match_options.get('host')target_port = match_options.get('port')return target_host == self.host and target_port == self.portdef is_connected(self):if self._connected:return util.is_connected(self._s)return Falsedef handle_exception(self, exception):print('got an exception')print(str(exception))def get_lifetime(self):return self._lifedef invalidate(self):self._s.close()self._connected = Falseself._life = -1def release(self):if self._pool is not None:if self._connected:self._pool.release_connection(self)else:self._pool = Nonedef send(self, data):return self._s.send(data)def recv(self, size=1024):return self._s.recv(size)

不需要太多额外解释。

拓展实现HiveConnector

根据自身项目需要,我用pyhs2实现了一个hive连接池

 class HiveConnector(Connector):def __init__(self, host, port, backend_mod, pool=None, authMechanism='NOSASL',**options):self.host = hostself.port = portself.backend_mod = backend_modself._pool = poolself._connected = Falseself._conn = pyhs2.connect(host=host,port=port,authMechanism=authMechanism)self._connected = True# use a 'jiggle' value to make sure there is some# randomization to expiry, to avoid many conns expiring very# closely together.self._life = time.time() - random.randint(0, 10)def __del__(self):self.release()def matches(self, **match_options):target_host = match_options.get('host')target_port = match_options.get('port')return target_host == self.host and target_port == self.portdef is_connected(self):return self._connecteddef handle_exception(self, exception):logger.exception("error: %s" % str(exception))def get_lifetime(self):return self._lifedef invalidate(self):try:self._conn.close()except:passfinally:self._connected = Falseself._life = -1def release(self):if self._pool is not None:if self._connected:self._pool.release_connection(self)else:self._pool = Nonedef cursor(self):return self._conn.cursor()def execute(self, hql):with self.curosr() as cur:return cur.execute(hql)hive_pool = ConnectionPool(factory=HiveConnector, **HIVE_CONNECTOR_CONFIG)

使用这个hive_pool去执行hql语句非常容易:

     with hive_pool.connection() as conn:with conn.cursor() as cur:print cur.getDatabases()

总结

简绍了socketpool的内部实现,以及如何使用它构造自己的连接池。

转载于:https://www.cnblogs.com/quijote/p/4388900.html

python socketpool:通用连接池相关推荐

  1. mysql连接池_基于Swoole的通用连接池 - 数据库连接池(life)

    open-smf/connection-pool 是一个基于Swoole的通用连接池,常被用作数据库连接池. 依赖 依赖版本PHP>=7.0.0Swoole>=4.2.9Recommend ...

  2. swoole mysql 连接池_基于Swoole的通用连接池 - 数据库连接池

    连接池 open-smf/connection-pool 是一个基于Swoole的通用连接池,常被用作数据库连接池. 依赖 依赖 版本 >=7.0.0 >=4.2.9 Recommend ...

  3. python requests 异步调用_构建高效的python requests长连接池详解

    前文: 最近在搞全网的CDN刷新系统,在性能调优时遇到了requests长连接的一个问题,以前关注过长连接太多造成浪费的问题,但因为系统都是分布式扩展的,针对这种各别问题就懒得改动了. 现在开发的缓存 ...

  4. python sqlserver api连接池_非常老的话题 SQLSERVER连接池

    非常老的话题 SQLSERVER连接池 写这篇文章不是说要炒冷饭,因为园子里有非常非常多关于SQLSERVER连接池的文章,但是他们说的都是引用MSDN里的解释 或者自己做一些测试试验一下连接池的性能 ...

  5. python pymysql使用连接池连接mysql示例

    不使用连接池: import pymysqlmysql_config = {"db": "test_db","host": "12 ...

  6. Golang 通用连接池

    在使用之前我们需要先了解清楚连接池的概念,总结下来连接池主要解决以下几类问题: 减少连接创建时间 不论是与数据库还是Thrift等程序建立连接都是有开销的.如果这类连接是"循环"使 ...

  7. python连接池原理_python redis之连接池的原理

    python redis之连接池的原理 什么是连接池 通常情况下, 当我们需要做redis操作时, 会创建一个连接, 并基于这个连接进行redis操作, 操作完成后, 释放连接, 一般情况下, 这是没 ...

  8. sybase jz0c0 连接已关闭_Go 基于 channel 实现连接池

    golang的channel除了goroutine通信之外还有很多其他的功能,本文将实现一种基于channel的通用连接池. 何为通用? 连接池的实现不依赖具体的实例,而依赖某个接口,本文的连接池选用 ...

  9. 一篇搞懂TCP、HTTP、Socket、Socket连接池

    上一篇:闲鱼面试官:Thread.sleep(0) 到底有什么用?我:有点懵~ 作者:数澜科技 链接:https://www.jianshu.com/p/e47a766e03da 前言:作为一名开发人 ...

最新文章

  1. php sql 条件拼组_ThinkPHP框架SQL操作链式写法原理(浅显易懂)
  2. 他不怕被拒绝_【保险知识】高情商保险营销,再也不怕被拒绝!
  3. 前端学习(1947)vue之电商管理系统电商系统之使用自定义模板渲染
  4. 手把手教你爬虫requests实战演练——python篇
  5. Java 堆内存是线程共享的!面试官:你确定吗?
  6. 人工神经网络之Python 实战
  7. IIS7.5应用程序池集成模式和经典模式的区别介绍
  8. ubuntu下鼠标右键新建文档
  9. C++ 原子操作与无锁编程
  10. 周志华:关于机器学习的一点思考
  11. 电脑本地连接,电脑出现多个“本地连接”的解决方法
  12. 解决端口占用问题 Port xxxx was already in use
  13. 关于防火墙DMZ区的使用和防火墙的DMZ区域规则的配置
  14. 数模论文排版—从第三页设置页码,页码格式为page X of Y
  15. 逆向某停车app(原创)
  16. celeste mod如何安装
  17. 大专学历走社招,两个部门,六轮面试,终与字节无缘
  18. esp32➡遥控篇➡turtlesim➡mobot➡turtlebot3
  19. stm32cubemx配置pwm
  20. 《Python编程 从入门到实践》 一、基础知识 第六章 字典

热门文章

  1. HttpClient 中文官方教程----第一章基础知识-只收录,未测试
  2. Puppet 4 性能提升超2倍,升级前应该你知悉的变化
  3. C语言 · 求矩阵各个元素的和
  4. Apache Spark 2.0预览: 机器学习模型持久化
  5. maven scala plugin 实现jvmArgs,执行过程原理解析笔记
  6. 转发与重定向(forward与redirect)
  7. ystep jQuery流程、步骤插件
  8. 利用GetPrivateProfileString读取配置文件(.ini)
  9. 摆线减速器原理、减速比、设计方法
  10. 划词翻译软件QTranslate 6.7.3 中文绿色版