简介

天不生我李淳罡,剑道万古如长夜。

Hikari [hi·ka·'lē] 是日语“光”的意思。HikariCP的卖点是快、简洁、可靠,整体非常轻量,只有130Kb左右。

HikariCP的出现可以说是颠覆了连接池领域,直接将性能做到极致,从此之后再无人可做出大的突破。就连曾经风靡一时的BoneCP也主动停止维护,让贤于他。而且,在Spring Boot 2.0中,HikariCP凭借其优越的性能取代Tomcat成为默认的数据库连接池。

HikariCP的性能如下图所示,每毫秒可执行5万次左右的连接操作(DataSource.getConnection()/Connection.close()),是c3p0的200倍。每毫秒可执行14.6万+的Statement操作(Connection.prepareStatement(), Statement.execute(), Statement.close())。dbcp2和Tomcat在Statement基准测试中由于超过了GC次数上限导致OutOfMemoryError而未能完成测试。

HikariCP用户反馈的部署前后连接的稳定性对比图。

连接池

以史为鉴,可以知兴替。

连接池(Connection Pool)技术的核心思想就是:连接复用,通过建立一个数据库连接池以及一套连接使用、分配、管理策略,使得该连接池中的连接可以得到高效、安全的复用,避免了数据库连接频繁建立、关闭的开销。

在不使用连接池的情况下,每次请求都需要经过如下过程:

  1. 与MySQL服务器三次握手建立TCP连接;
  2. 登录认证建立MySQL连接;
  3. 执行SQL语句获取结果;
  4. 断开与MySQL的连接;
  5. 关闭TCP连接。

使用连接池后只需要在初始化时执行一次建立TCP连接和登录认证流程,之后连接池会维护指定数量的连接资源,当有请求时直接从连接池中获取连接执行SQL即可,SQL执行完毕后归还连接给连接池,而无需关闭连接,待应用关闭时由连接池负责将申请的连接资源释放即可。

数据库连接池相关技术发展至今已非常成熟了,使用比较广泛的有BoneCP、DBCP、C3P0、Druid等。

C3P0

C3P0(Why C3P0)在很长一段时间内一直是Java领域内数据库连接池的代名词,当年盛极一时的Hibernate都将其作为内置的数据库连接池,可见业内对它还是认可的。C3P0的功能简单易用、稳定性好,但是性能上的缺点却让它最终被打入冷宫。C3P0的性能差到即便是和同时代的产品相比它也是垫底的,更不用和Druid、HikariCP相比了。正常来讲,有问题很正常,改就是了,但C3P0最致命的问题就是架构设计过于复杂,让重构变成了一项不可能完成的任务。最终,性能有硬伤的C3P0也彻底的退出了历史舞台。

DBCP

DBCP(DataBase Connection Pool)属于Apache顶级项目Commons中的核心子项目,在Apache的生态圈中的影响十分广泛,Tomcat就在其内部集成了DBCP,实现JPA规范的OpenJPA,也默认集成了DBCP。但DBCP并不是独立实现连接池功能的,它内部依赖于Commons中的另一个子项目pool。连接池最核心的“池”,就是由pool组件提供的,因此,DBCP的性能实际上就是pool的性能。但有很长一段时间pool都停留在1.x版本,导致DBCP也更新乏力,许多依赖DBCP的应用在遇到性能瓶颈后别无选择,只能将其替换掉。Tomcat就在其7.0版本中重新设计开发了一套连接池–Tomcat JDBC Pool

2013年9月Commons-Pool 2.0 版本发布,事情迎来转机,DBCP 2.0版本在2014年2月发布,基于新的线程模型全新设计的“池”让DBCP重焕青春,虽然和新一代的连接池相比仍有一定差距,但DBCP 2.x版本已经稳稳达到了和新一代产品同级别的性能指标。

然而长时间的等待已经消磨了用户的耐心,DBCP2与其他产品相比没有任何突出优势。试问,谁会在有选择的前提下,去选择那个并不优秀的呢?如果有,那也只是情怀吧。

BoneCP

在本文开头已提到过BoneCP,它是一个以高性能著称的JDBC连接池。BoneCP的高性能一是源自其极简的设计,整个产品只有几百k大小,二是其重构了内部pool的设计,减少了锁的使用。这两点优化原则,几乎适用于所有的连接池产品。

值得一提的是,BoneCP本身并不“健全”,它的很多特征都依赖于Guava,因此也和DBCP一样面临更新乏力的问题。但现在这些都不重要了,BoneCP引以为傲的性能已被HikariCP全面超越,已主动让贤于与自己设计思路类似的HikariCP。

精简的设计

有道无术,术尚可求,有术无道,止于术。

HikariCP崇尚的设计美学是极简主义,遵循KISS (Keep It Simple Stupid) 的设计哲学。

对连接池来说,要想在性能上做出改进,最直接的做法可能是优化获取连接的逻辑。然而HikariCP获取连接的逻辑和其他连接池的差别并不大,其大部分性能提升来自于对Connection,Statement 等的委托(delegates)的优化。具体包括以下几个方面。

字节码优化

Java的编译过程分为两部分,首先由javac将代码编译成字节码,然后再由解释器将字节码解释为机器码来执行。所以在性能上,Java通常不如C++这类将代码直接编译成CPU所能理解的机器码的编译型语言。为了优化Java的性能 ,JVM在解释器之外引入了即时(Just In Time)编译器:当程序运行时,解释器首先发挥作用,代码可以直接执行。随着时间推移,即时编译器逐渐发挥作用,把越来越多的代码编译优化成本地代码,来获取更高的执行效率。

HikariCP的作者研究了编译器的字节码输出,JIT的汇编输出,将关键链路限制在JIT内联阈值以下。并且扁平化了继承层次结构,隐藏了成员变量,消除了类型转换。

ArrayList–>FastList

减少ConnectionProxy中用来追踪Statement实例的ArrayList实例,当一个Statement被关闭的时候,必须将它从集合中移出,当集合被关闭的时候,必须迭代集合将其中打开的所有statement实例,然后才能清空集合。Java中的ArrayList在get(int index)调用时会进行边界检测,HikariCP中能保证边界,因此这个检查就是无意义的。

另外,在remove(Object)的实现中会从头到尾进行扫描,然而,在JDBC中通常会在使用完后立即关闭Statements或按打开的相反顺序进行关闭,因此从尾部开始扫描性能会更好。因此,HikariCP中用自定义的FastList来替换ArrayList,FastList不进行边界检测且删除元素时从尾部开始扫描。

ConcurrentBag

HikariCP中包含了一个自定义的无锁集合ConcurrentBag。无锁设计、ThreadLocal缓存、

Queue-stealing、hand-off优化使得ConcurrentBag具有高并发、低延迟、最小化伪共享(false-sharing)的特性。

ConcurrentBag是HikariCP连接池中存放连接的容器,属于核心内容,其具体实现我们留到后面代码分析时再进行详细说明。

代理实现

为了为 Connection、Statement 和 ResultSet 实例生成代理,HikariCP 最初使用了一个以ConnectionProxy形式维护在静态字段 (PROXY_FACTORY) 中的单例工厂。有十几个类似的方法:

public final PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException
{return PROXY_FACTORY.getProxyPreparedStatement(this, delegate.prepareStatement(sql, columnNames));
}

用单例工厂生成的字节码如下:

    public final java.sql.PreparedStatement prepareStatement(java.lang.String, java.lang.String[]) throws java.sql.SQLException;flags: ACC_PRIVATE, ACC_FINALCode:stack=5, locals=3, args_size=30: getstatic     #59                 // Field PROXY_FACTORY:Lcom/zaxxer/hikari/proxy/ProxyFactory;3: aload_04: aload_05: getfield      #3                  // Field delegate:Ljava/sql/Connection;8: aload_19: aload_210: invokeinterface #74,  3           // InterfaceMethod java/sql/Connection.prepareStatement:(Ljava/lang/String;[Ljava/lang/String;)Ljava/sql/PreparedStatement;15: invokevirtual #69                 // Method com/zaxxer/hikari/proxy/ProxyFactory.getProxyPreparedStatement:(Lcom/zaxxer/hikari/proxy/ConnectionProxy;Ljava/sql/PreparedStatement;)Ljava/sql/PreparedStatement;18: return

可以看到先是getstatic调用拿到静态字段PROXY_FACTORY的值,最后invokevirtual调用ProxyFactory实例中的getProxyPreparedStatement()方法。

去掉单例工厂将其替换为有着静态方法(由Javassist生成具体实现)的final类后,Java代码变为:

    public final PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException{return ProxyFactory.getProxyPreparedStatement(this, delegate.prepareStatement(sql, columnNames));}

生成的字节码变为:

    private final java.sql.PreparedStatement prepareStatement(java.lang.String, java.lang.String[]) throws java.sql.SQLException;flags: ACC_PRIVATE, ACC_FINALCode:stack=4, locals=3, args_size=30: aload_01: aload_02: getfield      #3                  // Field delegate:Ljava/sql/Connection;5: aload_16: aload_27: invokeinterface #72,  3           // InterfaceMethod java/sql/Connection.prepareStatement:(Ljava/lang/String;[Ljava/lang/String;)Ljava/sql/PreparedStatement;12: invokestatic  #67                 // Method com/zaxxer/hikari/proxy/ProxyFactory.getProxyPreparedStatement:(Lcom/zaxxer/hikari/proxy/ConnectionProxy;Ljava/sql/PreparedStatement;)Ljava/sql/PreparedStatement;15: areturn

两相对比可看出:

  1. getstatic调用没有了;

  2. invokevirtual调用替换成了更容易被JVM优化的invokestatic;

  3. 堆栈大小从5个元素减少到4个元素。这是因为invokevirtual会先将ProxyFactory实例压入堆栈,当调用getProxyPreparedStatement()时再将其弹出。

Statement Cache

在许多连接池中都会提供PreparedStatement缓存,而HikariCP中则去掉了这个缓存。在连接池层使用statement cache是一个反面模式 (Anti-pattern),与driver层提供的缓存相比,会对应用程序的性能产生负面影响。

在连接池层 PreparedStatements 只能缓存每个连接。如果有 250 个经常执行的查询和一个包含 20 个连接的池,则数据库需要保留 5000 个查询执行计划。同理,连接池必须缓存这么多 PreparedStatements 及其相关的对象图。

大多数主流数据库 JDBC 驱动程序(PostgreSQL、Oracle、MySQL等)已经有一个可以配置的statement缓存。 JDBC 驱动程序处于利用数据库特定功能的独特位置,几乎所有缓存实现都能够跨连接共享执行计划。这意味着这 250 个经常执行的查询会在数据库中产生恰好 250 个执行计划,而不是内存中的 5000 个语句和相关的执行计划。聪明的实现甚至不会在驱动程序级别的内存中保留 PreparedStatement 对象,而只是将新实例附加到现有计划 ID。

Scheduler quanta

当‘一次’运行的线程数超过CPU核数时,操作系统会给每个线程分配一个小的运行时间片,并在线程间切换调度(这样一次切换叫做一个quantum)。当一个时间片用完后,在调度器再给该线程分配下一个时间片之前可能要等待一段‘长’的时间,因此,一个线程要尽可能在他自己的时间片里完成,并避免锁强制它放弃自己的时间片就显得至关重要,否则就会有大的性能损耗。

CPU缓存行失效

CPU缓存行失效是除了锁之外,另一个会导致线程无法在一个quanta中完成的因素。

当线程被调度器再次唤起时,它确实获得了再次在其经常访问的数据上运行的机会,然而这些数据可能已经不再位于CPU的L1 或 L2 缓存中了,因为我们没法控制下次调度会被分配给哪个CPU核。

优雅的实现

这一拳二十年的功夫,你挡得住吗? --《霍元甲》

HikariCP 的源码少且精,可读性非常高。如果你想提升自己的Java多线程编程能力,可以来看看HikariCP的源码,字里行间皆透露出作者是一个具有多年多线程编程实战经验的高手。

核心类

HikariPool负责对连接的管理,其中维护了一个ConcurrentBag,里面存放着连接对象PoolEntry,getConnection逻辑就是从ConcurrentBag中borrow一个可用的连接PoolEntry,再经过ProxyFactory对连接实例进行处理后返回代理连接给HikariDataSource使用。


获取连接

  • HikariDataSource#getConnection

说明:

  1. 该方法的主体是使用 双重检查锁定模式 获取HikariPool对象,如果对象为空则进行初始化,实际getConnection逻辑在HikariPool中;

  2. 从上述流程图可知HikariCP是在第一次getConnection时对HikariPool进行的初始化,一旦启动相应配置就固化了(sealed),不允许再次修改(除非使用HikariConfigMXBean方法);

  3. HikariPool类型的对象有两个:fastPathPool和pool,如果使用的是HikariDataSource的默认构造函数则fastPathPool为null,如果使用指定配置的构造函数则pool和fastPathPool等价。二者的差别在于getConnection时pool会因为延迟初始化检查导致性能有轻微下降。

  • HikariPool#getConnection

说明:

  1. suspendResumeLock:通过对信号量进行简单封装实现了一个锁,用于控制获取连接的频率。如果isAllowPoolSuspension配置为true则创建一个最大并发量10000的Semaphore,否则给一个假的锁实现。

  2. 获取poolEntry后判断其是否可用时,如果连接没有被标记为evicted,则会进行连接判活,判断连接距上次访问的时间间隔是否大于500ms(默认值,可配置),如果大于则继续判断连接是否dead。从中可看出HikariCP的连接判活还是挺频繁的,这说明连接判活对其性能影响并不大,这也是得益于其无锁实现。

  3. isConnectionDead方法用于判断连接是否已失活,需要注意的是这里的连接不是PoolEntry对象,而是其持有的实际驱动的Connection对象。如果支持jdbc4协议,则执行驱动程序的isValid判断,否则执行开销较大的createStatement+execute的逻辑。比较取巧的是HikariCP是通过connectionTestQuery是否为null来判断是否支持jdbc4协议的。因此,如果是jdbc4驱动的话不要配置connectionTestQuery,这个配置只是给旧的不支持Connection.isValid的版本使用的

  4. HikariPool中维护了一个ConcurrentBag,里面存放着连接对象PoolEntry,getConnection逻辑就是从ConcurrentBag中borrow一个可用的连接PoolEntry。

初始化池对象

public HikariPool(final HikariConfig config){super(config); // 继承自PoolBase,基本属性设置// initializeDataSource 底层DataSource设置,jdbcUrl、username、password等this.connectionBag = new ConcurrentBag<>(this); // 真正存放连接的对象,核心!this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();// 初始化一个ScheduledExecutorService,用于Housekeeping:// 连接泄露检测、定时清理闲置连接、延迟关闭连接 checkFailFast(); // initializationFailTimeout>0 则进行DB连通性检测if (config.getMetricsTrackerFactory() != null) {setMetricsTrackerFactory(config.getMetricsTrackerFactory());}else {setMetricRegistry(config.getMetricRegistry());}setHealthCheckRegistry(config.getHealthCheckRegistry());handleMBeans(this, true);ThreadFactory threadFactory = config.getThreadFactory();final int maxPoolSize = config.getMaximumPoolSize();LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(maxPoolSize);// 用于添加连接的线程池,核心和最大线程数为1,队列为addConnectionQueuethis.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new CustomDiscardPolicy());// 用于异步关闭物理连接的线程池,核心和最大线程数为1,maxPoolSize大小的LinkedBlockingQueuethis.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());// 初始化连接泄露告警器this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);// 定时任务,30s调度一次HouseKeeper,用于淘汰连接、维护最小连接数this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) {addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));final long startTime = currentTime();while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) {quietlySleep(MILLISECONDS.toMillis(100));}addConnectionExecutor.setCorePoolSize(1);addConnectionExecutor.setMaximumPoolSize(1);}}

说明:

  1. 上述主体逻辑为初始化连接池属性、DataSource属性、初始化存放连接的ConcurrentBag、初始化用于管理连接的线程池和调度任务、初始化监控器。

  2. 如果设置了初始化超时时间,则会进行联通检测/连接池预热,具体流程为创建一个连接对象PoolEntry,放到连接池的connectionBag中,代码见checkFailFast。

连接池管理

HikariPool负责对连接池的管理,包括连接池扩充、连接池缩容、连接池关闭等。

连接池扩充

连接池在淘汰连接、关闭连接后,以及连接池从挂起变为启用之前,会进行连接池扩充,以保证minimumIdle。

   private synchronized void fillPool(final boolean isAfterAdd){// 获取空闲连接数:统计connectionBag中状态为STATE_NOT_IN_USE的连接数final var idle = getIdleConnections();final var shouldAdd = getTotalConnections() < config.getMaximumPoolSize() && idle < config.getMinimumIdle();if (shouldAdd) {final var countToAdd = config.getMinimumIdle() - idle;for (int i = 0; i < countToAdd; i++)// 异步添加连接,postFillPoolEntryCreator与poolEntryCreator的区别只是日志前缀// addConnectionExecutor 核心和最大线程数为1addConnectionExecutor.submit(isAfterAdd ? postFillPoolEntryCreator : poolEntryCreator);}else if (isAfterAdd) {logger.debug("{} - Fill pool skipped, pool has sufficient level or currently being filled.", poolName);}}// PoolEntryCreator的核心逻辑是创建连接createPoolEntry,并将其加到connectionBag中// 创建链接对象的逻辑如下private PoolEntry createPoolEntry(){try {// 从DataSource获取物理连接,设置到poolEntryfinal var poolEntry = newPoolEntry();// maxLifetime>0 houseKeepingExecutorService延迟执行MaxLifetimeTask任务final var maxLifetime = config.getMaxLifetime();if (maxLifetime > 0) {// variance up to 2.5% of the maxlifetimefinal var variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;final var lifetime = maxLifetime - variance;poolEntry.setFutureEol(houseKeepingExecutorService.schedule(new MaxLifetimeTask(poolEntry), lifetime, MILLISECONDS));}// houseKeepingExecutorService定期执行KeepaliveTaskfinal long keepaliveTime = config.getKeepaliveTime();if (keepaliveTime > 0) {// variance up to 10% of the heartbeat timefinal var variance = ThreadLocalRandom.current().nextLong(keepaliveTime / 10);final var heartbeatTime = keepaliveTime - variance;poolEntry.setKeepalive(houseKeepingExecutorService.scheduleWithFixedDelay(new KeepaliveTask(poolEntry), heartbeatTime, heartbeatTime, MILLISECONDS));}return poolEntry;}catch (ConnectionSetupException e) {if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrentlylogger.error("{} - Error thrown while acquiring connection from data source", poolName, e.getCause());lastConnectionFailure.set(e);}}catch (Exception e) {if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrentlylogger.debug("{} - Cannot acquire connection from data source", poolName, e);}}return null;}
  • MaxLifetimeTask 与 KeepaliveTask
   // 当连接poolEntry达到设置的maxLifetime时,执行任务将其从连接池中剔除private final class MaxLifetimeTask implements Runnable{private final PoolEntry poolEntry;MaxLifetimeTask(final PoolEntry poolEntry){this.poolEntry = poolEntry;}public void run(){if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {// 旧连接被剔除后,判断是否需要添加新连接addBagItem(connectionBag.getWaitingThreadCount());}}}@Overridepublic void addBagItem(final int waiting){// 当等待线程数大于添加连接的queue size时,增加一个添加连接的任务// 本来需要waiting个连接,现在可用连接又被淘汰了一个,需要再添加一个if (waiting > addConnectionExecutor.getQueue().size())addConnectionExecutor.submit(poolEntryCreator);}// 定时执行连接保活任务private final class KeepaliveTask implements Runnable{private final PoolEntry poolEntry;KeepaliveTask(final PoolEntry poolEntry){this.poolEntry = poolEntry;}public void run(){// 将连接置为reserved,如果物理连接可用再将连接置为可用// 如果物理连接不可用,则将该连接淘汰,并按需要添加新连接if (connectionBag.reserve(poolEntry)) {if (isConnectionDead(poolEntry.connection)) {softEvictConnection(poolEntry, DEAD_CONNECTION_MESSAGE, true);addBagItem(connectionBag.getWaitingThreadCount());}else {connectionBag.unreserve(poolEntry);logger.debug("{} - keepalive: connection {} is alive", poolName, poolEntry.connection);}}}}

连接池缩容

在连接池初始化时,会初始化一个默认30s执行一次的houseKeeperTask,用于定期将连接池中失效的连接清理掉,同时保持要求的最小空闲链接数。

// HouseKeeper.run()/*** 时间回拨:由于网络时间校准或人工设置,导致系统时间跳回到过去的某个时间* 时间回拨检测,如果时间回退到过去则淘汰所有连接* 如果时间往前拨,则不做处理,因为这除了会加速连接被淘汰外无其他影响*/   // Detect retrograde time, allowing +128ms as per NTP spec.if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) {logger.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",poolName, elapsedDisplayString(previous, now));previous = now;softEvictConnections();return; // 发生时间回拨时任务中断执行,不再执行后续清理和扩充逻辑}else if (now > plusMillis(previous, (3 * housekeepingPeriodMs) / 2)) {// No point evicting for forward clock motion, this merely accelerates connection retirement anywaylogger.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));}previous = now;if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {logPoolState("Before cleanup ");// 取出所有空闲连接,淘汰其中已超时连接final var notInUse = connectionBag.values(STATE_NOT_IN_USE);var maxToRemove = notInUse.size() - config.getMinimumIdle();for (PoolEntry entry : notInUse) {if (maxToRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {closeConnection(entry, "(connection has passed idleTimeout)");maxToRemove--;}}logPoolState("After cleanup  ");}elselogPoolState("Pool ");// 可能淘汰了一些连接,因此需要触发连接池扩充流程fillPool(true); // Try to maintain minimum connections
  • 淘汰连接
   // 淘汰连接:撤销连接泄露报警任务,软淘汰连接public void evictConnection(Connection connection){var proxyConnection = (ProxyConnection) connection;proxyConnection.cancelLeakTask();try {softEvictConnection(proxyConnection.getPoolEntry(), "(connection evicted by user)", !connection.isClosed() /* owner */);}catch (SQLException e) {// unreachable in HikariCP, but we're still forced to catch it}}// 软淘汰一个连接,将连接标记为evict// 如果是连接的owner 或者 连接是空闲的(可以被置为reserved),则进一步关闭连接private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner){poolEntry.markEvicted();if (owner || connectionBag.reserve(poolEntry)) {closeConnection(poolEntry, reason);return true;}return false;}// 关闭连接void closeConnection(final PoolEntry poolEntry, final String closureReason){// 将待关闭连接poolEntry从connectionBag移除if (connectionBag.remove(poolEntry)) {// 关闭poolEntry连接:endOfLife、keepalive、connection置为nullfinal var connection = poolEntry.close();// 线程池异步关闭物理连接closeConnectionExecutor.execute(() -> {quietlyCloseConnection(connection, closureReason);if (poolState == POOL_NORMAL) {fillPool(false); // 关闭连接后需要扩充连接池保证最小连接数}});}}

连接池关闭

   // 关闭pool,关闭所有空闲连接,关闭活动连接public synchronized void shutdown() throws InterruptedException{try {poolState = POOL_SHUTDOWN; // 关闭连接池if (addConnectionExecutor == null) { // pool never startedreturn;}logPoolState("Before shutdown ");// 关闭houseKeeperTask,都要关了,没必要维护最小空闲连接了,直接在后面关闭所有连接即可if (houseKeeperTask != null) {houseKeeperTask.cancel(false);houseKeeperTask = null;}// 关闭所有空闲连接,对connectionBag中的每个连接执行softEvictConnection逻辑// owner参数为false,所以只有实际是空闲的才会执行closeConnection逻辑,否则只是标记为evictedsoftEvictConnections();// 关闭添加连接线程池addConnectionExecutor.shutdown();if (!addConnectionExecutor.awaitTermination(getLoginTimeout(), SECONDS)) {logger.warn("Timed-out waiting for add connection executor to shutdown");}// 关闭HouseKeeping线程池destroyHouseKeepingExecutorService();// 关闭存放连接的connectionBag,closed=trueconnectionBag.close();// 派一组刺客杀掉所有连接,形象的名字:)final var assassinExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection assassinator",config.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());try {final var start = currentTime();do {// 干掉所有活着的连接:关闭poolEntry、中止物理连接、从connectionBag中移除poolEntryabortActiveConnections(assassinExecutor);// 再次关闭所有空闲连接softEvictConnections();// 给10s时间赶尽杀绝} while (getTotalConnections() > 0 && elapsedMillis(start) < SECONDS.toMillis(10));}finally {assassinExecutor.shutdown(); // 关闭刺客组织if (!assassinExecutor.awaitTermination(10L, SECONDS)) {logger.warn("Timed-out waiting for connection assassin to shutdown");}}shutdownNetworkTimeoutExecutor();closeConnectionExecutor.shutdown();if (!closeConnectionExecutor.awaitTermination(10L, SECONDS)) {logger.warn("Timed-out waiting for close connection executor to shutdown");}}finally {logPoolState("After shutdown ");handleMBeans(this, false);metricsTracker.close();}}

ConcurrentBag

ConcurrentBag是一个专为连接池设计的并发包,它比LinkedBlockingQueue 和 LinkedTransferQueue有着更好的性能。它使用ThreadLocal来尽可能避免锁定,但当ThreadLocal列表中没有可用元素时也会去扫描公共集合。也就是说ThreadLocal列表中未使用的元素可以被其他borrowing线程窃取。

需要注意的是从ConcurrentBag中borrow出去的元素实际上并没有从容器中移除,也就是说即使没有引用也不会发生GC,因此必须注意对借出的对象进行requite,否则会导致内存泄露。只有remove方法才能将对象从ConcurrentBag中完全移除。

public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable
{private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);// 存放连接PoolEntry的地方// CopyOnWriteArrayList线程安全,读操作无锁,写操作通过创建新副本实现// 读写分离,适用于读多写少场景private final CopyOnWriteArrayList<T> sharedList;// 是否使用WeakReference?默认falseprivate final boolean weakThreadLocals;// 线程维度的缓存,从sharedList中拿到的连接会缓存到线程里,borrow时先从缓存里private final ThreadLocal<List<Object>> threadList;// 内部接口,HikariPool实现了该接口,用于添加连接,即sharedList添加元素private final IBagStateListener listener;// 当前获取不到连接而发生阻塞的线程数private final AtomicInteger waiters;private volatile boolean closed;// 0容量的同步队列,即产即销,用于做信息传递。// 不能insert元素除非有另一个线程正在尝试remove它。// 公平模式:先进先出queue,非公平模式:后进先出stackprivate final SynchronousQueue<T> handoffQueue;// 内部接口,PoolEntry实现了该接口,用于控制连接的状态,通过状态标记实现了无锁设计public interface IConcurrentBagEntry{int STATE_NOT_IN_USE = 0;int STATE_IN_USE = 1;int STATE_REMOVED = -1;int STATE_RESERVED = -2;boolean compareAndSet(int expectState, int newState);void setState(int newState);int getState();}// 省略...
}

状态机

borrow

borrow用于在指定时间内从ConcurrentBag中获取一个连接,如果超时未拿到则返回null。会先从缓存ThreadLocal中拿,无可用连接时再从公共sharedList中拿,sharedList也无可用连接,则需要新增连接。

    public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException{// Try the thread-local list firstfinal var list = threadList.get();// 从后往前判断,因为回收连接时add操作会将刚回收的放在末尾,尾部拿到空闲连接的概率大for (int i = list.size() - 1; i >= 0; i--) {final var entry = list.remove(i); // 先remove,回收时再add进来@SuppressWarnings("unchecked")final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}}// Otherwise, scan the shared list ... then poll the handoff queuefinal int waiting = waiters.incrementAndGet(); // threadList没拿到连接waiters就会加1try {for (T bagEntry : sharedList) {if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {// If we may have stolen another waiter's connection, request another bag add.if (waiting > 1) { // 有阻塞线程在等着获取连接,更新添加连接的队列大小为waiting-1listener.addBagItem(waiting - 1); // 当前线程已获取连接,所以要-1}return bagEntry;}}// 当前线程未获取连接,等待添加的连接数仍然是waitinglistener.addBagItem(waiting);// 到这里才开始算timeout,也就是timeout控制的是新增连接线程池添加连接的超时时间// 不包括从ThreadLocal缓存和sharedList中获取连接的耗时,但注意while循环条件判断时做了10ms的偏移timeout = timeUnit.toNanos(timeout);do {final var start = currentTime();final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}timeout -= elapsedNanos(start);} while (timeout > 10_000);return null; // 仍然没拿到连接,返回null,HikariPool抛超时异常}finally {waiters.decrementAndGet(); // 当前请求处理结束,waiters-1}}

add

添加一个新的连接对象到ConcurrentBag中,给其他等待创建连接的线程borrow。addConnectionExecutor异步添加连接时会调用该方法。

handoffQueue起到线程间通信的作用,offer只有在有另一个线程等着接收时才能将元素给出去,即产即销保证borrow线程能及时拿到需要的连接。

   public void add(final T bagEntry){if (closed) {LOGGER.info("ConcurrentBag has been closed, ignoring add()");throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");}sharedList.add(bagEntry); // 添加到公共sharedList里// spin until a thread takes it or none are waiting// 自旋等待直到有线程来获取连接或者已没有等待线程// while+Thread.yield 是一种常见的实现自旋的写法,在开源框架中常见其身影while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {// 自旋等待让出CPU,让同优先级或更高优先级的线程先执行,提高cpu利用率Thread.yield();}}

requite

getConnection时会先将物理连接包装为PoolEntry放进ConcurrentBag,获取连接时从ConcurrentBag中取出后会再包装为ProxyConnection对象给使用方,使用方使用完后会触发ProxyConnection中的close方法回收连接。

ProxyConnection.close 方法会触发PoolEntry.recycle 方法将poolEntry归还给pool,会调用HikariPool.recycle方法,该方法实际执行的是ConcurrentBag.requite方法,requite方法负责将用完的连接重置为可用状态,达到连接复用的目的。

如果被borrow的连接没有被requite则会导致内存泄露。

   public void requite(final T bagEntry){bagEntry.setState(STATE_NOT_IN_USE); // 将连接状态置为空闲// 如果存在等待线程,将归还的连接直接通过handoffQueue传递给borrow使用// borrow时threadList中没有waiters会+1for (var i = 0; waiters.get() > 0; i++) {if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {return;}// 每255次进行一次10ms的小憩,防止并发过高waiters过多导致cpu占用过高else if ((i & 0xff) == 0xff) { // LockSupport.parkNanos挂起当前线程不再被调度,// 直到超时或有其他线程调用unpark释放该线程的许可(HikariCP中没有调用unpark)parkNanos(MICROSECONDS.toNanos(10)); }else { // 当前循环没有线程poll连接,让出CPU,等下一个循环Thread.yield();}}// 没有waiters且没有线程使用该连接,则将该空闲连接放到threadLocalList尾部final var threadLocalList = threadList.get();if (threadLocalList.size() < 50) {threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);}}

注意:add中while+yield与requite中for+yield+parkNanos的区别 #1305

  • requite中的连接(bagEntry)在自旋时可能改变状态,这是因为其引用可能被其他线程的ThreadLocal所持有,因此在自旋时必须检查连接状态。而这种状态改变是不可能发生在add中的,因为add时不会有其他线程拥有这个新连接的引用。

  • 对于yield和yield+parkNanos,循环中只有yield可能导致一个CPU核被打到100%,在requite中可能有很多线程,需要偶尔调用parkNanos以防跨多核使用CPU。在add时因为添加连接是一个单线程,所以影响并不大。

remove

从connectionBag中移除连接,只有在要关闭对应物理连接时才会执行该方法。

   public boolean remove(final T bagEntry){// 只有borrow出去的或reserve的连接能被removed// 通过CAS将状态置为REMOVEDif (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);return false;}// 从sharedList中移出元素,CopyOnWriteArrayList.removefinal boolean removed = sharedList.remove(bagEntry);if (!removed && !closed) {LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);}// 从ThreadLocal中移出元素threadList.get().remove(bagEntry);return removed;}

reserve & unreserve

   // reserve用于使连接不可被borrow,主要用于清理空闲连接时使用public boolean reserve(final T bagEntry){// 只能将空闲连接置为reservedreturn bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);}// unreserve用于KeepaliveTask,poolEntry会先reserve// 如果物理连接仍然可用,则通过unreserve再将其变为空闲连接public void unreserve(final T bagEntry){if (bagEntry.compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) {// spin until a thread takes it or none are waitingwhile (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {Thread.yield();}}else {LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry);}}

参数设置

  • 常用参数
参数名 参数说明 默认值 参考值
minimumIdle 连接池空闲连接最小数量。 10 5
maximumPoolSize 最大连接数。 10 10
connectionTimeout 如果不能获取可用连接,抛出异常前的等待时间。 30000 6000
idleTimeout 空闲连接可被清理的超时时长。真正清理是在守护线程(默认每30s一次)执行时进行。不小于10000 600000 600000
maxLifetime 连接的生命时长(毫秒),超时而且没被使用则被释放,不小于30000 1800000 1800000
leakDetectionThreshold 连接出池超时记录时长。默认为0关闭泄露检测,如果要开启需>2000 0 300000
validationTimeout 检验连接存活超时时长,如果断开不能重连,可以尝试调这个值 5000 5000
dataSource.cachePrepStmts 缓存PreparedStatement false false
dataSource.prepStmtCacheSize PreparedStatement缓存大小 25 25
dataSource.prepStmtCacheSqlLimit driver缓存的statement 最大长度 256 256
dataSource.useServerPrepStmts 新版MySQL支持服务端prepared statements,提升性能 false false
connectionTestQuery driver支持JDBC4,强烈建议不设置

注:所有时间参数单位都是毫秒;minimumIdle、maximumPoolSize按场景调整。

  • 连接池不建议设置太大,主要有如下几点原因:
  1. 连接池内的连接是复用的;
  2. 对于分库分表数据源,对每个库都会维持一份连接池;
  3. 对于读写分离的数据源,对每个库会分别维持一份读连接池、一份写连接池,相当于每个库的连接数都翻倍了;
  4. 需要考虑服务所在机器的承受能力,比如一个服务有32个库,每个库连接池最大连接数是100,则单个服务实例最大会有3200个数据库连接,也即仅数据库连接就占用了3200个线程;
  5. 需要考虑数据库服务端能够承受的最大连接数,比如一个服务设置连接池最大连接数是100,如果服务部署了200台,则数据库服务端单个库的连接数最大将达到20000个,数据库服务端可能会被打挂。

总结

Simplicity is prerequisite for reliability. – Edsger Dijkstra

本文介绍了一个牛气哄哄的命名为光的数据库连接池:HikariCP。先简单回顾了数据库连接池的发展,然后重点针对HikariCP的设计哲学和代码实现进行深入学习。

CAS状态设置实现无锁设计、ThreadLocal线程缓存、SynchronousQueue信息传递、队列窃取、双重检查锁、JVM字节码优化、Thread.yield与LockSupport.Park的灵活应用等等各种多线程编程技巧信手拈来,让我们在如诗一般的代码中跟随大师的智慧、享受了一场多线程编程的盛宴,受益匪浅。

参考

  1. 为什么需要数据库连接池_MySQL_赖猫

  2. 源码详解系列(八)–全面讲解HikariCP的使用和源码

  3. 终于理解Spring Boot 为什么青睐HikariCP了,图解的太透彻了!

  4. 伪共享(false sharing),并发编程无声的性能杀手 - cyfonly

  5. 【并发编程】不存储元素的同步阻塞队列SynchronousQueue

  6. Java即时编译器原理解析及实践

HikariCP:一个叫光的JDBC连接池相关推荐

  1. java mssql jdbc_一个简单的Struts JDBC连接池(mssql)

    tomcat 的错误信息: ------------------------------------------------------------ 2006-9-14 16:43:36 org.ap ...

  2. 数据库连接之jdbc连接池

    BC 1. 概念:Java DataBase Connectivity Java 数据库连接, Java语言操作数据库 JDBC本质:官方(sun公司)定义的一套操作所有关系型数据库的规则,即接口,s ...

  3. 数据层优化-jdbc连接池简述、druid简介

    终于回到既定轨道上了,这一篇讲讲数据库连接池的相关知识,线程池以后有机会再结合项目单独写篇文章(自己给自己挖坑,不知道什么时候能填上),从这一篇文章开始到本阶段结束的文章都会围绕数据库和dao层的优化 ...

  4. HikariCP 高性能的 JDBC 连接池

    HikariCP 是一个高性能的 JDBC 连接池组件.下图是性能的比较测试结果: 使用方法: HikariConfig config = new HikariConfig(); config.set ...

  5. jdbc封装mysql_用Java手动封装JDBC连接池(一)

    JDBC存在的问题 代码的冗余:在对数据库进行增删改查时,每个操作的JDBC流程和SQL执行代码的流程都一样,造成代码的冗余,所以我们可以把冗余的部分封装起来,封装之后,我们就不用再去写JDBC流程, ...

  6. 在独立Java应用程序中使用Tomcat JDBC连接池

    这是从我们的客人文章W4G伙伴克拉伦斯豪的作者临春3从A按. 您可能会在文章结尾找到本书的折扣券代码,仅适用于Java Code Geeks的读者! 请享用! 在需要数据访问权限的独立Java应用程序 ...

  7. JDBC连接池和DBUtils

    本节内容: JDBC连接池 DBUtils 一.JDBC连接池 实际开发中"获得连接"或"释放资源"是非常消耗系统资源的两个过程,为了解决此类性能问题,通常情况 ...

  8. jdbc连接池工作原理_JDBC连接实际上如何工作?

    jdbc连接池工作原理 The JDBC Connection interface is a part of java.sql package. The java.sql.Connection int ...

  9. JDBC、封装JDBC连接池、第三方连接池工具

    主要内容: JDBC简介 JDBC来源 通过代码实现JDBC JDBC的改进需求 JDBC改进的代码实现 JDBC使用的设计模式 封装连接池 封装JDBC连接池 ThreadLoacl的使用 Thre ...

最新文章

  1. cloning java_深入浅出Java中的clone克隆方法,写得太棒了!
  2. 论Java程序的运行机制
  3. 在 Windows 上像 Linux 一样使用命令
  4. 用 SAP ABAP 编写的俄罗斯游戏
  5. 使用方法 ros_大白菜的ROS笔记(8)(创建TF广播和监听,内容很多,细节满满)...
  6. python自学行吗-有编程基础Python自学行吗?
  7. 技能大赛 计算机 融合 研究,技能大赛与高职计算机教学的融合研究.doc
  8. js简单实现切换图片上一张下一张功能
  9. Python实现熵权法(正负指标)并计算综合评分—————附增完整代码和测试用例
  10. 惠普803墨盒清零步骤_打印机惠普7110墨盒清零的方法
  11. word修改表格和下方段落的间距
  12. 如何查看电脑操作系统及系统类型
  13. programData
  14. JDK11变化详解JDK8升级JDK11详细指南
  15. shell md5sum命令
  16. Spine 导出视频 音效事件
  17. java随机数函数_java随机函数使用方法Random
  18. QT 版puremvc框架
  19. 14 实例:自动轨迹绘制
  20. 在小程序里使用md5

热门文章

  1. 责任链模式 php,编程中的那些套路——关于责任链模式
  2. Android5.0及以上版本新特性
  3. js 正则删除span标签以及标签里面的内容
  4. HBuilder-简易banner制作
  5. springboot进行mock测试
  6. 大数据和机器学习的关系
  7. ToG产品_产品发布流程_2019_004
  8. Dedekind切割定理,以及用它证明确界存在定理
  9. python字符串的基本操作_python字符串的基本操作
  10. onmouseout方法