python连接hive的工具可以用 pyhive 和 impala,不管是哪个配置都比较麻烦。需要的依赖包比较多。

  • https://github.com/cloudera/impyla
  • https://github.com/dropbox/PyHive

pyhive模块没有提供数据库连接池的API。所以自己根据模块 mysql-connector-python 的连接池改装了一个 pyhive 的连接池,效率会提升不少。

连接池介绍

# hive_pool.py
import re
from pyhive import hivetry:import queue
except ImportError:import Queue as queue
import threadingCONNECTION_POOL_LOCK = threading.RLock()
CNX_POOL_MAXSIZE = 32
CNX_POOL_MAXNAMESIZE = 64
CNX_POOL_NAMEREGEX = re.compile(r'[^a-zA-Z0-9._:\-*$#]')class PoolError(BaseException):passdef generate_pool_name(**kwargs):parts = []for key in ('host', 'port', 'database'):try:parts.append(str(kwargs[key]))except KeyError:passif not parts:raise PoolError("Failed generating pool name; specify pool_name")return '_'.join(parts)class PooledHiveConnection(object):def __init__(self, pool, cnx):if not isinstance(pool, HiveConnectionPool):raise AttributeError("pool should be a HiveConnectionPool")if not isinstance(cnx, hive.Connection):raise AttributeError("cnx should be a hive.Connection")self._cnx_pool = poolself._cnx = cnxdef __getattr__(self, attr):return getattr(self._cnx, attr)def close(self):cnx = self._cnxself._cnx_pool.add_connection(cnx)self._cnx = None@propertydef pool_name(self):return self._cnx_pool.pool_nameclass HiveConnectionPool(object):def __init__(self, pool_size=5, pool_name=None,**kwargs):self._pool_size = Noneself._pool_name = Noneself._set_pool_size(pool_size)self._set_pool_name(pool_name or generate_pool_name(**kwargs))self._cnx_config = {}self._cnx_queue = queue.Queue(self._pool_size)if kwargs:self.set_config(**kwargs)cnt = 0while cnt < self._pool_size:self.add_connection()cnt += 1@propertydef pool_name(self):return self._pool_name@propertydef pool_size(self):return self._pool_sizedef set_config(self, **kwargs):if not kwargs:returnwith CONNECTION_POOL_LOCK:try:hive.Connection(**kwargs)self._cnx_config = kwargsexcept AttributeError as err:raise PoolError("Connection configuration not valid: {0}".format(err))def _set_pool_size(self, pool_size):if pool_size <= 0 or pool_size > CNX_POOL_MAXSIZE:raise AttributeError("Pool size should be higher than 0 and ""lower or equal to {0}".format(CNX_POOL_MAXSIZE))self._pool_size = pool_sizedef _set_pool_name(self, pool_name):if CNX_POOL_NAMEREGEX.search(pool_name):raise AttributeError("Pool name '{0}' contains illegal characters".format(pool_name))if len(pool_name) > CNX_POOL_MAXNAMESIZE:raise AttributeError("Pool name '{0}' is too long".format(pool_name))self._pool_name = pool_namedef _queue_connection(self, cnx):if not isinstance(cnx, hive.Connection):raise PoolError("Connection instance not subclass of MySQLConnection.")try:self._cnx_queue.put(cnx, block=False)except queue.Full:raise PoolError("Failed adding connection; queue is full")def add_connection(self, cnx=None):with CONNECTION_POOL_LOCK:if not self._cnx_config:raise PoolError("Connection configuration not available")if self._cnx_queue.full():raise PoolError("Failed adding connection; queue is full")if not cnx:cnx = hive.Connection(**self._cnx_config)else:if not isinstance(cnx, hive.Connection):raise PoolError("Connection instance not subclass of MySQLConnection.")self._queue_connection(cnx)def get_connection(self):with CONNECTION_POOL_LOCK:try:cnx = self._cnx_queue.get(block=False)except queue.Empty:raise PoolError("Failed getting connection; pool exhausted")return PooledHiveConnection(self, cnx)def _remove_connections(self):with CONNECTION_POOL_LOCK:cnt = 0cnxq = self._cnx_queuewhile cnxq.qsize():try:cnx = cnxq.get(block=False)cnx.close()cnt += 1except queue.Empty:return cntexcept PoolError:raisereturn cntclass ReallyHiveConnectionPool(HiveConnectionPool):def __init__(self, **hive_config):pool_size = hive_config.get('pool_size', 10)self._semaphore = threading.Semaphore(pool_size)super().__init__(**hive_config)def get_connection(self):self._semaphore.acquire()return super().get_connection()def put_connection(self, con):con.close()self._semaphore.release()

连接池代码样例

连接池如何使用,样例如下。

from contextlib import contextmanagerfrom hive_pool import ReallyHiveConnectionPoolhive_config = {'host': '***.***.***.***','port': '10000','database': 'default'
}conxpool = ReallyHiveConnectionPool(pool_size=10, pool_name='myhive', **hive_config)@contextmanager
def get_cursor():try:# con = hive.Connection(**hive_config)con = conxpool.get_connection()cursor = con.cursor()yield cursorfinally:cursor.close()# con.close()conxpool.put_connection(con)class MYPyHive(object):"""创建python操作hive类"""@staticmethoddef get_all(sql):with get_cursor() as cursor:cursor.execute(sql)return cursor.fetchall()if __name__ == '__main__':def t(n):ph = MYPyHivehive_query = "show tables"r = ph.get_all(hive_query)print(str(n) + str(r))import timefrom concurrent.futures import ThreadPoolExecutors = time.time()# for i in range(20):#     t(i)with ThreadPoolExecutor(max_workers=15) as pool:for i in range(20):pool.submit(t, (i))print(time.time() - s)

pyhive数据库连接池使用相关推荐

  1. c#打开数据库连接池的工作机制_数据库连接池-tomcat-jdbc使用笔记

    现在 主流的数据库连接池有:Proxool.C3P0.DBCP.tomcat-jdbc.Druid.其中tomcat-jdbc是tomcat服务器比较可靠的 数据库连接池. Tomcat 在 7.0 ...

  2. Druid数据库连接池超时问题com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 1000, active 10

    问题描述: com.alibaba.druid.pool.GetConnectionTimeoutException: wait millis 1000, active 10at com.alibab ...

  3. Druid数据库连接池使用参考

    一:添加相应依赖 druid-1.0.9.jar: mysql-connector-java-5.1.48-bin.jar 二:编写properties文件 放置位置在src中: driverClas ...

  4. mysql连接池为何不用nio_为什么要用数据库连接池?

    1.为什么要用数据库连接池? 最原始的数据库使用就是打开一个连接并进行使用,使用过后一定要关闭连接释放资源.由于频繁的打开和关闭连接对jvm包括数据库 都有一定的资源负荷,尤其应用压力较大时资源占用比 ...

  5. net core mysql 连接池_EF Core 小坑:DbContextPool 会引起数据库连接池连接耗尽

    EF Core 小坑:DbContextPool 会引起数据库连接池连接耗尽 发布时间:2019-02-18 22:05, 浏览次数:1152 , 标签: EF Core DbContextPool ...

  6. 数据库连接池,实现及分析

    在我们日常对数据库操作时存在一个问题,要为每次数据操作请求建立一个数据库连接.而每次建立连接都需要花费很多开销,如加载驱动类.注册驱动.获取连接,这样如果在短时间内连接多次,就 会耗费多余的时间(加载 ...

  7. swoole实现数据库连接池

    2019独角兽企业重金招聘Python工程师标准>>> 原生 PHP CURD 让我们来回顾一下PHP中数据库的使用 <?php # curd.php$id = 1;$dbh ...

  8. 聊一个不常见的面试题:为什么数据库连接池不采用 IO 多路复用?

    欢迎关注方志朋的博客,回复"666"获面试宝典 今天我们聊一个不常见的 Java 面试题:为什么数据库连接池不采用 IO 多路复用? 这是一个非常好的问题.IO多路复用被视为是非常 ...

  9. 为什么数据库连接池不采用 IO 多路复用?

    欢迎关注方志朋的博客,回复"666"获面试宝典 接着,今天我们聊一个不常见的 Java 面试题:为什么数据库连接池不采用 IO 多路复用? 这是一个非常好的问题.IO多路复用被视为 ...

  10. 数据库连接池为什么要用threadlocal呢?不用会怎样?

    点击关注公众号,Java干货及时送达 来源:blog.csdn.net/qq_42405666/article/details/108258820 这个问题我疑问了很久很久,主要如下截图. 个连接对应 ...

最新文章

  1. 对于任天堂你了解多少?
  2. 刘夏真的简历中国科学院计算机所,专家人才库数据----中国科学院计算技术研究所...
  3. ILockBytes Windows Mobile 6.5
  4. 必备面试题:系统CPU飙高和GC频繁,如何排查?
  5. 【Spring注解系列10】SpringBean的生命周期
  6. MyEclipse Build path contains duplicate entry
  7. 在钉钉上怎么手写_胖·评测|亲测!磐度A5数字纸笔手写板能适配多少直播平台?...
  8. java1.5特性_JDK核心API:Java1.5语言新特性简单总结
  9. 深度探秘.NET 5.0
  10. k层交叉检验(k-flod cross-validation)
  11. Python学习入门基础教程(learning Python)--5.2 Python读文件基础
  12. AfxMessageBox详细使用说明
  13. win10家庭版添加组策略编辑器,禁用系统自动更新
  14. 关于Tacotron2看这一篇就够了
  15. mysql explain 类似_Oracle有没有类似MySQL中的explain功能
  16. ODC 3.4.0 现已上线,让数据库开发更简单
  17. html如何添加qq聊天框
  18. 新浪微博开放平台开发总结
  19. 干货精选 | 迅雷链再度亮相“魔都”上海,性能与安全兼得的区块链为何备受关注?...
  20. docker oxidized时区问题,时间显示不是北京时间问题的解决办法

热门文章

  1. 微信小游戏引擎插件,Creator 使用教程!
  2. MES系统的功能详细以及应用价值介绍
  3. 有效需求预测的四大优势
  4. 群辉安装linux软件下载,群晖系统Synology DSM安装ipkg包管理和套件安装方法
  5. 日文简历 模板
  6. webstorm 主题设置 皮肤设置
  7. 银河麒麟linux找不到网卡,银河麒麟(Ubuntu)无法上网问题的解决方法
  8. JAVA JSP javaweb餐厅点餐系统源码(点餐系统)点餐系统网上订餐系统在线订餐系统
  9. HiJson(Json格式化工具)64位中文版下载 v2.1.2
  10. kuangbin RMQ