pyhive数据库连接池使用
python连接hive的工具可以用 pyhive 和 impala,不管是哪个配置都比较麻烦。需要的依赖包比较多。
- https://github.com/cloudera/impyla
- https://github.com/dropbox/PyHive
pyhive模块没有提供数据库连接池的API。所以自己根据模块 mysql-connector-python 的连接池改装了一个 pyhive 的连接池,效率会提升不少。
连接池介绍
# hive_pool.py
import re
from pyhive import hivetry:import queue
except ImportError:import Queue as queue
import threadingCONNECTION_POOL_LOCK = threading.RLock()
CNX_POOL_MAXSIZE = 32
CNX_POOL_MAXNAMESIZE = 64
CNX_POOL_NAMEREGEX = re.compile(r'[^a-zA-Z0-9._:\-*$#]')class PoolError(BaseException):passdef generate_pool_name(**kwargs):parts = []for key in ('host', 'port', 'database'):try:parts.append(str(kwargs[key]))except KeyError:passif not parts:raise PoolError("Failed generating pool name; specify pool_name")return '_'.join(parts)class PooledHiveConnection(object):def __init__(self, pool, cnx):if not isinstance(pool, HiveConnectionPool):raise AttributeError("pool should be a HiveConnectionPool")if not isinstance(cnx, hive.Connection):raise AttributeError("cnx should be a hive.Connection")self._cnx_pool = poolself._cnx = cnxdef __getattr__(self, attr):return getattr(self._cnx, attr)def close(self):cnx = self._cnxself._cnx_pool.add_connection(cnx)self._cnx = None@propertydef pool_name(self):return self._cnx_pool.pool_nameclass HiveConnectionPool(object):def __init__(self, pool_size=5, pool_name=None,**kwargs):self._pool_size = Noneself._pool_name = Noneself._set_pool_size(pool_size)self._set_pool_name(pool_name or generate_pool_name(**kwargs))self._cnx_config = {}self._cnx_queue = queue.Queue(self._pool_size)if kwargs:self.set_config(**kwargs)cnt = 0while cnt < self._pool_size:self.add_connection()cnt += 1@propertydef pool_name(self):return self._pool_name@propertydef pool_size(self):return self._pool_sizedef set_config(self, **kwargs):if not kwargs:returnwith CONNECTION_POOL_LOCK:try:hive.Connection(**kwargs)self._cnx_config = kwargsexcept AttributeError as err:raise PoolError("Connection configuration not valid: {0}".format(err))def _set_pool_size(self, pool_size):if pool_size <= 0 or pool_size > CNX_POOL_MAXSIZE:raise AttributeError("Pool size should be higher than 0 and ""lower or equal to {0}".format(CNX_POOL_MAXSIZE))self._pool_size = pool_sizedef _set_pool_name(self, pool_name):if CNX_POOL_NAMEREGEX.search(pool_name):raise AttributeError("Pool name '{0}' contains illegal characters".format(pool_name))if len(pool_name) > CNX_POOL_MAXNAMESIZE:raise AttributeError("Pool name '{0}' is too long".format(pool_name))self._pool_name = pool_namedef _queue_connection(self, cnx):if not isinstance(cnx, hive.Connection):raise PoolError("Connection instance not subclass of MySQLConnection.")try:self._cnx_queue.put(cnx, block=False)except queue.Full:raise PoolError("Failed adding connection; queue is full")def add_connection(self, cnx=None):with CONNECTION_POOL_LOCK:if not self._cnx_config:raise PoolError("Connection configuration not available")if self._cnx_queue.full():raise PoolError("Failed adding connection; queue is full")if not cnx:cnx = hive.Connection(**self._cnx_config)else:if not isinstance(cnx, hive.Connection):raise PoolError("Connection instance not subclass of MySQLConnection.")self._queue_connection(cnx)def get_connection(self):with CONNECTION_POOL_LOCK:try:cnx = self._cnx_queue.get(block=False)except queue.Empty:raise PoolError("Failed getting connection; pool exhausted")return PooledHiveConnection(self, cnx)def _remove_connections(self):with CONNECTION_POOL_LOCK:cnt = 0cnxq = self._cnx_queuewhile cnxq.qsize():try:cnx = cnxq.get(block=False)cnx.close()cnt += 1except queue.Empty:return cntexcept PoolError:raisereturn cntclass ReallyHiveConnectionPool(HiveConnectionPool):def __init__(self, **hive_config):pool_size = hive_config.get('pool_size', 10)self._semaphore = threading.Semaphore(pool_size)super().__init__(**hive_config)def get_connection(self):self._semaphore.acquire()return super().get_connection()def put_connection(self, con):con.close()self._semaphore.release()
连接池代码样例
连接池如何使用,样例如下。
from contextlib import contextmanagerfrom hive_pool import ReallyHiveConnectionPoolhive_config = {'host': '***.***.***.***','port': '10000','database': 'default'
}conxpool = ReallyHiveConnectionPool(pool_size=10, pool_name='myhive', **hive_config)@contextmanager
def get_cursor():try:# con = hive.Connection(**hive_config)con = conxpool.get_connection()cursor = con.cursor()yield cursorfinally:cursor.close()# con.close()conxpool.put_connection(con)class MYPyHive(object):"""创建python操作hive类"""@staticmethoddef get_all(sql):with get_cursor() as cursor:cursor.execute(sql)return cursor.fetchall()if __name__ == '__main__':def t(n):ph = MYPyHivehive_query = "show tables"r = ph.get_all(hive_query)print(str(n) + str(r))import timefrom concurrent.futures import ThreadPoolExecutors = time.time()# for i in range(20):# t(i)with ThreadPoolExecutor(max_workers=15) as pool:for i in range(20):pool.submit(t, (i))print(time.time() - s)
pyhive数据库连接池使用相关推荐
- c#打开数据库连接池的工作机制_数据库连接池-tomcat-jdbc使用笔记
现在 主流的数据库连接池有:Proxool.C3P0.DBCP.tomcat-jdbc.Druid.其中tomcat-jdbc是tomcat服务器比较可靠的 数据库连接池. Tomcat 在 7.0 ...
- Druid数据库连接池超时问题com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 1000, active 10
问题描述: com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 1000, active 10at com.alibab ...
- Druid数据库连接池使用参考
一:添加相应依赖 druid-1.0.9.jar: mysql-connector-java-5.1.48-bin.jar 二:编写properties文件 放置位置在src中: driverClas ...
- mysql连接池为何不用nio_为什么要用数据库连接池?
1.为什么要用数据库连接池? 最原始的数据库使用就是打开一个连接并进行使用,使用过后一定要关闭连接释放资源.由于频繁的打开和关闭连接对jvm包括数据库 都有一定的资源负荷,尤其应用压力较大时资源占用比 ...
- net core mysql 连接池_EF Core 小坑:DbContextPool 会引起数据库连接池连接耗尽
EF Core 小坑:DbContextPool 会引起数据库连接池连接耗尽 发布时间:2019-02-18 22:05, 浏览次数:1152 , 标签: EF Core DbContextPool ...
- 数据库连接池,实现及分析
在我们日常对数据库操作时存在一个问题,要为每次数据操作请求建立一个数据库连接.而每次建立连接都需要花费很多开销,如加载驱动类.注册驱动.获取连接,这样如果在短时间内连接多次,就 会耗费多余的时间(加载 ...
- swoole实现数据库连接池
2019独角兽企业重金招聘Python工程师标准>>> 原生 PHP CURD 让我们来回顾一下PHP中数据库的使用 <?php # curd.php$id = 1;$dbh ...
- 聊一个不常见的面试题:为什么数据库连接池不采用 IO 多路复用?
欢迎关注方志朋的博客,回复"666"获面试宝典 今天我们聊一个不常见的 Java 面试题:为什么数据库连接池不采用 IO 多路复用? 这是一个非常好的问题.IO多路复用被视为是非常 ...
- 为什么数据库连接池不采用 IO 多路复用?
欢迎关注方志朋的博客,回复"666"获面试宝典 接着,今天我们聊一个不常见的 Java 面试题:为什么数据库连接池不采用 IO 多路复用? 这是一个非常好的问题.IO多路复用被视为 ...
- 数据库连接池为什么要用threadlocal呢?不用会怎样?
点击关注公众号,Java干货及时送达 来源:blog.csdn.net/qq_42405666/article/details/108258820 这个问题我疑问了很久很久,主要如下截图. 个连接对应 ...
最新文章
- 对于任天堂你了解多少?
- 刘夏真的简历中国科学院计算机所,专家人才库数据----中国科学院计算技术研究所...
- ILockBytes Windows Mobile 6.5
- 必备面试题:系统CPU飙高和GC频繁,如何排查?
- 【Spring注解系列10】SpringBean的生命周期
- MyEclipse Build path contains duplicate entry
- 在钉钉上怎么手写_胖·评测|亲测!磐度A5数字纸笔手写板能适配多少直播平台?...
- java1.5特性_JDK核心API:Java1.5语言新特性简单总结
- 深度探秘.NET 5.0
- k层交叉检验(k-flod cross-validation)
- Python学习入门基础教程(learning Python)--5.2 Python读文件基础
- AfxMessageBox详细使用说明
- win10家庭版添加组策略编辑器,禁用系统自动更新
- 关于Tacotron2看这一篇就够了
- mysql explain 类似_Oracle有没有类似MySQL中的explain功能
- ODC 3.4.0 现已上线,让数据库开发更简单
- html如何添加qq聊天框
- 新浪微博开放平台开发总结
- 干货精选 | 迅雷链再度亮相“魔都”上海,性能与安全兼得的区块链为何备受关注?...
- docker oxidized时区问题,时间显示不是北京时间问题的解决办法
热门文章
- 微信小游戏引擎插件,Creator 使用教程!
- MES系统的功能详细以及应用价值介绍
- 有效需求预测的四大优势
- 群辉安装linux软件下载,群晖系统Synology DSM安装ipkg包管理和套件安装方法
- 日文简历 模板
- webstorm 主题设置 皮肤设置
- 银河麒麟linux找不到网卡,银河麒麟(Ubuntu)无法上网问题的解决方法
- JAVA JSP javaweb餐厅点餐系统源码(点餐系统)点餐系统网上订餐系统在线订餐系统
- HiJson(Json格式化工具)64位中文版下载 v2.1.2
- kuangbin RMQ