spring+dbcp连接池源码分析
Spring对数据库连接池的支持
常见的数据库连接池有c3p0
,dbcp
以及druid
,这里使用的是dbcp
。
前文中使用DataSourceUtils
获取和释放connection,代码如下:
//org.springframework.jdbc.datasource.DataSourceUtilspublic abstract class DataSourceUtils {//获取连接public static Connection getConnection(DataSource dataSource) {return doGetConnection(dataSource);}public static Connection doGetConnection(DataSource dataSource) {//spring 事务相关--不做过多分析,后续讲解ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {conHolder.requested();if (!conHolder.hasConnection()) {conHolder.setConnection(dataSource.getConnection());}return conHolder.getConnection();}//1.Connection con = dataSource.getConnection();//spring 事务相关--不做过多分析,后续讲解if (TransactionSynchronizationManager.isSynchronizationActive()) {ConnectionHolder holderToUse = conHolder;if (holderToUse == null) {holderToUse = new ConnectionHolder(con);} else {holderToUse.setConnection(con);}holderToUse.requested();TransactionSynchronizationManager.registerSynchronization(new ConnectionSynchronization(holderToUse, dataSource));holderToUse.setSynchronizedWithTransaction(true);if (holderToUse != conHolder) {TransactionSynchronizationManager.bindResource(dataSource, holderToUse);}}return con;}public static void releaseConnection(Connection con, DataSource dataSource) {doReleaseConnection(con, dataSource);}public static void doReleaseConnection(Connection con, DataSource dataSource){if (con == null) {return;}//spring 事务相关 .... 略....doCloseConnection(con, dataSource);}
}
针对上述关于事务的部分:
在spring事务部分会做详细介绍,这里大致说明一下流程:
- 在spring事务方法invoke时,或尝试获取事务信息,在
org.springframework.jdbc.datasource.DataSourceTransactionManager#getTransaction(TransactionDefinition)
- 在该方法第一步,会通过
doGetTransaction()
来从TransactionSynchronizationManager.getResource(this.dataSource)
获取conn信息(ThreadLocal,如果没有则返回null),并绑定到事务对象txObject中- 如果当前事务对象中没有conn,则会新创建一个,然后通过
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
绑定到当前线程中; 这段逻辑在doBegin()
体现;
dbcp.BasicDataSource
根据配置,<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
,
我们得知使用的dataSource具体类型为BasicDataSource
.
//org.apache.commons.dbcp.BasicDataSourcepublic class BasicDataSource implements DataSource {protected volatile GenericObjectPool connectionPool = null;protected volatile DataSource dataSource = null;//配置一系列参数......protected String driverClassName = null;protected String username = null;protected volatile String password = null;protected String url = null;//验证sql,如: select 1 from dual;protected volatile String validationQuery = null;public Connection getConnection() throws SQLException {//a. createDataSource()//b. 根据创建的DataSourceImpl.getConnection(); //这里使用的是PoolingDataSourcereturn createDataSource().getConnection();}protected synchronized DataSource createDataSource(){// 如果dataSource已经创建 直接返回.if (dataSource != null) {return (dataSource);}//1.创建返回原始物理连接的工厂ConnectionFactory driverConnectionFactory = createConnectionFactory();//2.为我们的连接创建一个池createConnectionPool();//statementPoolFactory 略...GenericKeyedObjectPoolFactory statementPoolFactory = null;//3.创建池化的连接工厂createPoolableConnectionFactory(driverConnectionFactory, statementPoolFactory, abandonedConfig);//4.创建dataSource实例,createDataSourceInstance();//5. 连接池 初始化 initialSize 个connectionfor (int i = 0 ; i < initialSize ; i++) {connectionPool.addObject();}return dataSource;}
}
获取连接池分为两步:
- 创建连接工厂-
createConnectionFactory()
- 获取连接池-
getConnection()
createDataSource()
总体的步骤如下图:
- 创建"物理"连接的工厂— createConnectionFactory()
- 创建连接池 — createConnectionPool()
- 创建“池化”的工厂
- 创建"池化"DataSource
- 初始化connection
1. createConnectionFactory()
protected ConnectionFactory createConnectionFactory() throws SQLException {// 创建JDBC driver实例(略....)Driver driver = DriverManager.getDriver(url);// Can't test without a validationQueryif (validationQuery == null) {setTestOnBorrow(false);setTestOnReturn(false);setTestWhileIdle(false);}String user = username;connectionProperties.put("user", user);String pwd = password;connectionProperties.put("password", pwd);//创建 DriverConnectionFactoryConnectionFactory driverConnectionFactory = new DriverConnectionFactory(driver, url, connectionProperties);return driverConnectionFactory;
}
2. createConnectionPool()
protected void createConnectionPool() {// 创建一个连接池以包含所有的活动连接。GenericObjectPool gop;if ((abandonedConfig != null) && (abandonedConfig.getRemoveAbandoned())) {gop = new AbandonedObjectPool(null,abandonedConfig);} else {gop = new GenericObjectPool();}gop.setMaxActive(maxActive);gop.setMaxIdle(maxIdle);gop.setMinIdle(minIdle);gop.setMaxWait(maxWait);gop.setTestOnBorrow(testOnBorrow);gop.setTestOnReturn(testOnReturn);gop.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);gop.setNumTestsPerEvictionRun(numTestsPerEvictionRun);gop.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);gop.setTestWhileIdle(testWhileIdle);connectionPool = gop;
}
createPoolableConnectionFactory()
protected void createPoolableConnectionFactory(ConnectionFactory driverConnectionFactory,KeyedObjectPoolFactory statementPoolFactory, AbandonedConfig configuration){PoolableConnectionFactory connectionFactory = null;//PoolableConnectionFactory的构造函数中://_pool.setFactory(this); //即:将PoolableConnectionFactory设置到connectionPool的_factory属性中.connectionFactory =new PoolableConnectionFactory(driverConnectionFactory,connectionPool,statementPoolFactory,validationQuery,validationQueryTimeout,connectionInitSqls,defaultReadOnly,defaultAutoCommit,defaultTransactionIsolation,defaultCatalog,configuration);validateConnectionFactory(connectionFactory);
}
4. createDataSourceInstance()
protected void createDataSourceInstance() {PoolingDataSource pds = new PoolingDataSource(connectionPool);pds.setAccessToUnderlyingConnectionAllowed(isAccessToUnderlyingConnectionAllowed());pds.setLogWriter(logWriter);dataSource = pds;}
5. 初始化连接池
根据配置的initialSize
属性,来初始n个连接,填充连接池。
for (int i = 0 ; i < initialSize ; i++) {connectionPool.addObject();
}
操作连接池
代码逻辑(粗)
1.初始化连接池
在createDateSource
中会for-each
调用connectionPool.addObject();
初始化连接池:
//org.apache.commons.pool.impl.GenericObjectPool
public void addObject() throws Exception {//1. 此时_factory的具体类型是:PoolableConnectionFactory,创建连接Object obj = _factory.makeObject();addObjectToPool(obj, false);
}
2.获取连接 – getConnection()
由createDataSource
返回的真正类型为PoolingDataSource
//org.apache.commons.dbcp.PoolingDataSourcepublic Connection getConnection() throws SQLException {Connection conn = (Connection)(_pool.borrowObject());if (conn != null) {conn = new PoolGuardConnectionWrapper(conn);} return conn;
}
3.释放连接—releaseConnection()
PoolGuardConnectionWrapper
为PoolingDataSource
的内部类,它由个delegate
成员变量,释放连接的是有delegate
完成的。
//org.apache.commons.dbcp.PoolingDataSource$PoolGuardConnectionWrapperprivate class PoolGuardConnectionWrapper extends DelegatingConnection {private Connection delegate;PoolGuardConnectionWrapper(Connection delegate) {super(delegate);this.delegate = delegate;}public void close() throws SQLException {if (delegate != null) {//delete具体类型为: PoolableConnectionthis.delegate.close();this.delegate = null;super.setDelegate(null);}}
}//org.apache.commons.dbcp.PoolableConnection
public synchronized void close() throws SQLException {if (_closed) {// already closedreturn;}if (!isUnderlyingConectionClosed) {//归还给连接池_pool.returnObject(this);} else {//释放连接(close),_pool.invalidateObject(this); }
}
调用逻辑图
源码分析
PoolableConnectionFactory
//org.apache.commons.dbcp.PoolableConnectionFactorypublic class PoolableConnectionFactory implements PoolableObjectFactory {//1.创建连接对象public Object makeObject() throws Exception {Connection conn = _connFactory.createConnection();//1.1initializeConnection(conn);return new PoolableConnection(conn,_pool,_config);}//1.1 执行配置的_connectionInitSqls 集合,初始化protected void initializeConnection(Connection conn) throws SQLException {Collection sqls = _connectionInitSqls;if(null != sqls) {Statement stmt = conn.createStatement();for (Iterator iterator = sqls.iterator(); iterator.hasNext();){stmt.execute(iterator.next().toString());}stmt.close(); }}//2.销毁连接对象(物理关闭)public void destroyObject(Object obj) throws Exception {if(obj instanceof PoolableConnection) {((PoolableConnection)obj).reallyClose();}}//3.校验连接对象有效性public boolean validateObject(Object obj) {if(obj instanceof Connection) {try {//3.1validateConnection((Connection) obj);return true;} catch(Exception e) {return false;}} else {return false;}}//3.1 执行配置的_validationQuery 集合,验证public void validateConnection(Connection conn) throws SQLException {String query = _validationQuery;if(null != query) {Statement stmt = null;ResultSet rset = null;try {stmt = conn.createStatement();rset = stmt.executeQuery(query);//必须返回一行记录,否则报错if(!rset.next()) {throw new SQLException("validationQuery didn't return a row");}} finally {rset.close();stmt.close();}}}//4. 使连接对象钝化public void passivateObject(Object obj) throws Exception {if(obj instanceof Connection) {Connection conn = (Connection)obj;if(!conn.getAutoCommit() && !conn.isReadOnly()) {conn.rollback();}conn.clearWarnings();if(!conn.getAutoCommit()) {conn.setAutoCommit(true);}}if(obj instanceof DelegatingConnection) {// setLastUsed(0); //将lastUsed设置0// _closed = true;((DelegatingConnection)obj).passivate();}}//5. 激活连接对象public void activateObject(Object obj) throws Exception {if(obj instanceof DelegatingConnection) {// _closed = false;// setLastUsed(); //将lastUsed设置为系统时间ms((DelegatingConnection)obj).activate();}if(obj instanceof Connection) {Connection conn = (Connection)obj;if (conn.getAutoCommit() != _defaultAutoCommit) {conn.setAutoCommit(_defaultAutoCommit);}//修改默认的隔离级别 //NONE ,READ_COMMITTED ,READ_UNCOMMITTED ,REPEATABLE_READ ,SERIALIZABLEif ((_defaultTransactionIsolation != UNKNOWN_TRANSACTIONISOLATION) && (conn.getTransactionIsolation() != _defaultTransactionIsolation)) {conn.setTransactionIsolation(_defaultTransactionIsolation);}if ((_defaultReadOnly != null) && (conn.isReadOnly() != _defaultReadOnly.booleanValue())) {conn.setReadOnly(_defaultReadOnly.booleanValue());}if ((_defaultCatalog != null) &&(!_defaultCatalog.equals(conn.getCatalog()))) {conn.setCatalog(_defaultCatalog);}}}
}
GenericObjectPool
//org.apache.commons.pool.impl.GenericObjectPoolpublic class GenericObjectPool extends BaseObjectPool implements ObjectPool {//从连接池中“借”走且尚未归还的连接对象, 即正在激活状态的连接对象private int _numActive = 0;//正在内部处理的对象(创建或销毁)总数, 不包含(active和idle)状态连接private int _numInternalProcessing = 0;//待分配的连接对象列表(可以理解为请求连接对象列表,按照线程到达顺序)private LinkedList _allocationQueue = new LinkedList();//连接池对象,注意连接池中都是可用对象(idle)private CursorableLinkedList _pool = null;//DEFAULT_TEST_ON_RETURN默认值为false, 标识在归还连接对象时是否需要validateObjectprivate volatile boolean _testOnReturn = DEFAULT_TEST_ON_RETURN; //1.初添加连接对象(此时空闲对象)至连接池中public void addObject() throws Exception {Object obj = _factory.makeObject();//此时的连接对象为空闲状态,所以无需递减正在运行的连接数addObjectToPool(obj, false);}//2.归还连接对象public void returnObject(Object obj) throws Exception {//归还活动的连接对象,decrementNumActive=trueaddObjectToPool(obj, true);}//3.添加连接对象到连接池中//无论是初始化还是return,连接对象在执行完此方法之后都将是空闲状态//decrementNumActive: 是否需要递减正在运行的连接数。private void addObjectToPool(Object obj, boolean decrementNumActive) throws Exception {boolean success = true;if(_testOnReturn && !(_factory.validateObject(obj))) {//如果开启_testOnReturn 且 连接对象未验证通过success = false;} else {//钝化连接对象_factory.passivateObject(obj);}//如果success=false,则需要销毁连接对象(物理销毁)boolean shouldDestroy = !success;synchronized (this) {if (isClosed()) {//如果连接池已经关闭,则直接销毁连接对象shouldDestroy = true;} else {if((_maxIdle >= 0) && (_pool.size() >= _maxIdle)) {//如果配置了最大空闲数 && 连接池中都是可用对象(idle状态) > 大于最大空闲数//则直接销毁对象shouldDestroy = true;} else if(success) {//采用两种策略,添加连接对象至pool,// LIFO(Last In, First out)// FIFO(First In, First Out)if (_lifo) {_pool.addFirst(new ObjectTimestampPair(obj));} else {_pool.addLast(new ObjectTimestampPair(obj));}if (decrementNumActive) {_numActive--;}allocate();}}}if(shouldDestroy) {//如果shouldDestroy,则直接销毁对象_factory.destroyObject(obj);// 如果shouldDestroy=true,则说明上述的 _lifo逻辑没有执行// 这里根据条件需要执行: _numActive--;和 allocate();逻辑if (decrementNumActive) {synchronized(this) {_numActive--;allocate();}}}}//4.从allocationQueue队列中获取可分配的Latch。private synchronized void allocate() {if (isClosed()) return;// 将_pool中空闲的示例分配给_allocationQueue中的latch 直至pool消耗完全;for (;;) {if (!_pool.isEmpty() && !_allocationQueue.isEmpty()) {Latch latch = (Latch) _allocationQueue.removeFirst();latch.setPair((ObjectTimestampPair) _pool.removeFirst());_numInternalProcessing++;synchronized (latch) {latch.notify();}} else {break;}}// 当_pool消耗完之后,而_allocationQueue仍有剩余(即仍有请求未被满足)// 如果未设置最大活动连接数,或者 正在活跃的连接数+正在处理的连接 < 最大活动连接数,则latch.setMayCreate()表示允许新创建;for(;;) {if((!_allocationQueue.isEmpty()) && (_maxActive < 0 || (_numActive + _numInternalProcessing) < _maxActive)) {Latch latch = (Latch) _allocationQueue.removeFirst();//latch.setMayCreate(true);_numInternalProcessing++;synchronized (latch) {latch.notify();}} else {break;}}}//5. 从连接池中"借用"连接对象public Object borrowObject() throws Exception {long starttime = System.currentTimeMillis();Latch latch = new Latch();byte whenExhaustedAction;long maxWait;synchronized (this) {whenExhaustedAction = _whenExhaustedAction;maxWait = _maxWait;// latch添加至queue_allocationQueue.add(latch);// 处理_allocationQueue, 分配_pool空闲idle连接实例allocate();}for(;;) {synchronized (this) {assertOpen();}//如果上述allocate()没有从_pool中分配idle的连接实例if(latch.getPair() == null) {//检查 是否允许创建if(latch.mayCreate()) {// 后续新创建连接} else {// 资源已耗尽....switch(whenExhaustedAction) {case WHEN_EXHAUSTED_GROW://GROW: 新创建连接break;case WHEN_EXHAUSTED_FAIL://FAIL:直接抛出异常throw new NoSuchElementException("Pool exhausted");case WHEN_EXHAUSTED_BLOCK://BLOCK策略:等待超过maxWait秒后抛出异常....default://默认策略:抛出IllegalArgumentException异常}}}boolean newlyCreated = false;if(null == latch.getPair()) {//新建对象连接对象...try { Object obj = _factory.makeObject();latch.setPair(new ObjectTimestampPair(obj));newlyCreated = true;} finally {if (!newlyCreated) {// object没有被成功创建//能执行到这里,说明在try{}代码中抛出异常.synchronized (this) {_numInternalProcessing--;allocate();}}}}// activate & validate _factory.activateObject(latch.getPair().value);synchronized(this) {_numInternalProcessing--;_numActive++;}return latch.getPair().value;}}//6.废弃连接对象public void invalidateObject(Object obj) throws Exception {try {if (_factory != null) {//销毁连接对象(物理销毁)_factory.destroyObject(obj);}} finally {synchronized (this) {//活动连接数递减_numActive--;allocate();}}}//7.清空连接池public void clear() {List toDestroy = new ArrayList();synchronized(this) {toDestroy.addAll(_pool);_numInternalProcessing = _numInternalProcessing + _pool._size;_pool.clear();}destroy(toDestroy);}//8.for-each调用_factory.destroyObject(obj);销毁连接对象private void destroy(Collection c) {for (Iterator it = c.iterator(); it.hasNext();) {try {_factory.destroyObject(((ObjectTimestampPair)(it.next())).value);} finally {synchronized(this) {//_numInternalProcessing 递减_numInternalProcessing--;allocate();}}}}//9.当前连接池已经关闭(注意不是连接对象关闭)protected final boolean isClosed() {return closed;}
}
其中borrowObject
的逻辑较为复杂,它的大致逻辑图如下:
spring+dbcp连接池源码分析相关推荐
- mybatisplus 集成druid连接池源码分析
mybatisplus 集成druid连接池源码分析:从spring的源码过渡到druid的相关jar包,里面是druid相关的类,下面我们开始分析: 1.取数据库连接的地方入口:public abs ...
- 线程池源码分析-FutureTask
1 系列目录 线程池接口分析以及FutureTask设计实现 线程池源码分析-ThreadPoolExecutor 该系列打算从一个最简单的Executor执行器开始一步一步扩展到ThreadPool ...
- cl.zk0.info/index.php,兄弟连区块链入门到精通教程btcpool矿池源码分析环境搭建
原标题:兄弟连区块链入门到精通教程btcpool矿池源码分析环境搭建 btcpool矿池-测试环境搭建及使用cgminer测试 本文档基于Ubuntu 16.04 LTS, 64 Bits. 安装Bi ...
- spring boot 2.0 源码分析(二)
在上一章学习了spring boot 2.0启动的大概流程以后,今天我们来深挖一下SpringApplication实例变量的run函数. 先把这段run函数的代码贴出来: /*** Run the ...
- spring cloud集成 consul源码分析
1.简介 1.1 Consul is a tool for service discovery and configuration. Consul is distributed, highly ava ...
- 14.QueuedConnection和BlockingQueuedConnection连接方式源码分析
QT信号槽直连时的时序和信号槽的连接方式已经在前面的文章中分析过了,见https://blog.csdn.net/Master_Cui/article/details/109011425和https: ...
- Java线程池 源码分析
1.个人总结及想法: (1)ThreadPoolExecutor的继承关系? ThreadPoolExecutor继承AbstractExectorService,AbstractExecutorSe ...
- java 线程池 源码_java线程池源码分析
我们在关闭线程池的时候会使用shutdown()和shutdownNow(),那么问题来了: 这两个方法又什么区别呢? 他们背后的原理是什么呢? 线程池中线程超过了coresize后会怎么操作呢? 为 ...
- 吐血整理:Java线程池源码分析(基于JDK1.8建议收藏)
文章目录 一.引言 二.线程池的参数介绍 1.ThreadPoolExecutor的UML图 三.线程池的使用 1.线程池的工作原理 2.线程池类型 2.1.newCachedThreadPool使用 ...
最新文章
- 【错误记录】Ubuntu 安装软件报错 ( Could not get lock /var/lib/dpkg/lock-frontend - open (11: Resource tempora )
- linux内核中的GPIO系统之(1):软件框架
- C++基类和派生类的析构函数
- [C++基础]017_数据的存放
- 1.0jpa 2.0_JPA 2.1如何成为新的EJB 2.0
- 浓浓中国风的雅致新年元旦PSD分层海报模板
- WebLogic及其重要概念
- Datawhale编程学习之排序(3)
- Python 使用pip下载失败的解决方案
- android发送语音动画,Android仿微信发送语音消息的功能及示例代码
- yarn安装依赖时出现错误(2种解决方法)Integrity check failed for “antd“ computed integrity doesn‘t match our records)
- STM32F407三重ADC配置
- ros android 方案,ROS + Android
- matlab坐标系无穷大,在 Matlab 中无穷大用 _______________ 表示
- ios7技巧:你需要掌握的19个iOS7使用技巧
- 网易视频云CEO余利华:云服务的核心仍是用户体验
- 三个最好的免费短信发送服务
- docker重启后启动失败Failed to start Docker Application Container Engine.
- edge浏览器 F12中文换成英文
- linux snap exe,在Linux系统上安装官方Snap Store应用程序