DataSource
1. 工厂方法模式
2. DataSourceFactory
在数据源模块中,DataSourceFactory接口扮演工厂接口的角色。UnpooledDataSourceFactory
和PooledDataSourceFactory
则扮演着具体工厂类的角色。
我们从DataSourceFactory
接口开始分析,其定义如下:
/**
* 设置DataSource相关属性,
* 一切紧跟在初始化完成之后,证明:
* {@link org.apache.ibatis.builder.xml.XMLConfigBuilder#dataSourceElement(XNode)}
* @param props
*/
void setProperties(Properties props);
/**
* 获取DataSource对象
* @return
*/
DataSource getDataSource();
2.1. UnpooledDataSourceFactory
在UnpooledDataSourceFactory
的构造方法中会直接创建UnpooledDataSource
对象,并初始化UnpooledDatasourceFactory.dataSource
字段。
protected DataSource dataSource;
/**
* 直接创建一个{@link UnpooledDataSource}
*/
public UnpooledDataSourceFactory() {
this.dataSource = new UnpooledDataSource();
}
UnpooledDataSourceFactory.setProperties()
方法会完成对UnpooledDataSourceFactory
对象的配置,代码如下:
@Override
public void setProperties(Properties properties) {
Properties driverProperties = new Properties();
//创建DataSource对应的MetaObject
MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
//遍历properties集合,该集合中配置了数据源需要的信息
for (Object key : properties.keySet()) {
String propertyName = (String) key;
//以"diver."开头的配置项是对DataSource的配置,记录到diverProperties中保存
if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
String value = properties.getProperty(propertyName);
driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
}
//是否有该属性对应的setter方法
else if (metaDataSource.hasSetter(propertyName)) {
String value = (String) properties.get(propertyName);
//根据属性类型将value的类型进行类型转换,主要是Integer、Long、Boolean三种类型转换
Object convertedValue = convertValue(metaDataSource, propertyName, value);
//设置DataSource的相关属性
metaDataSource.setValue(propertyName, convertedValue);
} else {
throw new DataSourceException("Unknown DataSource property: " + propertyName);
}
}
//设置DataSource的driverProperties属性
if (driverProperties.size() > 0) {
metaDataSource.setValue("driverProperties", driverProperties);
}
}
2.2. PooledDataSourceFactory
PooledDataSourceFactory
继承了UnpooledDataSourceFactory
,但是并没有覆盖setProperties()
和getDataSource()
方法。两者唯一不同的是PoolDataSoueceFactory
的构造函数会将其dataSource
字段初始化为PooledDataSource
对象。
/**
* 继承了{@link UnpooledDataSourceFactory},但是并没有覆盖{@link UnpooledDataSourceFactory#setProperties(Properties)}和{@link UnpooledDataSourceFactory#getDataSource()}方法,
* 唯一不同的是初始化的dataSource是不同的:
* {@link UnpooledDataSourceFactory} 初始化了{@link org.apache.ibatis.datasource.unpooled.UnpooledDataSource}
* {@link PooledDataSourceFactory} 初始化了{@link PooledDataSource}
* @author Clinton Begin
*/
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {
public PooledDataSourceFactory() {
this.dataSource = new PooledDataSource();
}
}
2.3. JndiDataSourceFactory
JndiDataSourceFactory
是依赖JNDI服务从容器中获取用户配置的DataSource
,其逻辑并不复杂,可以参看Tomcat的JNDI相关文档。
3. DataSource
javax.sql.DataSource
接口在数据源模块中扮演了产品接口的角色,Mybatis
提供了两个DataSource
接口的实现类,分别是UnpooledDataSource
和PooledDataSource
,他们扮演着具体产品类的角色。
3.1. UnpooledDataSource
UnpooledDataSource
实现了javax.sql.DataSource
接口中定义的getConnection()
方法及其重载方法,用于获取数据库连接。每次通过UnpooledDataSource.getConnection()
方法获取数据库连接时都会创建一个新连接。UnpooledDataSource
中的字段如下,每个字段都有对应的getter
和setter
方法:
/**
* 加载Driver类的类加载器
*/
private ClassLoader driverClassLoader;
/**
* 数据库连接驱动的相关配置
*/
private Properties driverProperties;
/**
* 缓存所有已注册的数据库连接驱动
*/
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();
/**
* 数据库连接驱动的名称
*/
private String driver;
/**
* 数据库Url
*/
private String url;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 是否自动提交
*/
private Boolean autoCommit;
/**
* 事务隔离级别
*/
private Integer defaultTransactionIsolationLevel;
/**
* 默认连接网络超时(3.5.2)
*/
private Integer defaultNetworkTimeout;
3.1.1. 向DriverManager
注册驱动
以JDBC
为例,我们知道创建数据库连接之前,首先要向DriverManager
注册JDBC
驱动类,com.mysql.jdbc.Driver
中有如下静态代码块:
//com.mysql.jdbc.Driver.java
static {
try {
//#ifdef JAVA8
DriverManager.registerDriver(driverInstance, new EmptyDiverAction());
//#else
/*
DriverManager.registerDriver(driverInstance);
*/
//#endif JAVA8
} catch (Exception e) {
}
}
public static final JDBCDriver driverInstance = new JDBCDriver();
DriverManager
中定义了registerDrivers
字段用于记录注册的JDBC
驱动,定义如下:
//java.sql.DriverManager.java
// List of registered JDBC drivers
private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();
public static synchronized void registerDriver(java.sql.Driver driver,
DriverAction da)
throws SQLException {
/* Register the driver if it has not already been added to our list */
if(driver != null) {
registeredDrivers.addIfAbsent(new DriverInfo(driver, da));
} else {
// This is for compatibility with the original DriverManager
throw new NullPointerException();
}
println("registerDriver: " + driver);
}
下面我们回到Mybatis
中的UnpooledDataSource
的分析,UnpooledDataSource
中定义了如下静态代码块,在UnpooledDataSource
加载时会通过静态代码块将已在DriverManager
中注册的JDBC Driver
复制一份到UnpooledDataSource.registeredDriver
集合中。
/**
* 此静态代码块,在当前类加载时将已经在DriverManager中注册的JDBC Driver复制一份到{@link UnpooledDataSource#registeredDrivers}中。
*/
static {
//获取到能加载到的所有的JDBC的驱动
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
//添加JDBC驱动
registeredDrivers.put(driver.getClass().getName(), driver);
}
}
3.1.2. getConnection()
UnpooledDataSource.getConnection()
方法:
/**
* 获取连接
* @return
* @throws SQLException
*/
@Override
public Connection getConnection() throws SQLException {
return doGetConnection(username, password);
}
/**
* 获取连接
* @return
* @throws SQLException
*/
@Override
public Connection getConnection(String username, String password) throws SQLException {
return doGetConnection(username, password);
}
3.1.2. doGetConnection()
UnpooledDataSource.getConnection()
方法的所有重载最终会调用UnpooledDataSource.doGetConnection()
方法获取数据库连接,具体实现如下:
private Connection doGetConnection(String username, String password) throws SQLException {
Properties props = new Properties();
//添加驱动配置
if (driverProperties != null) {
props.putAll(driverProperties);
}
//添加连接用户名的key和value
if (username != null) {
props.setProperty("user", username);
}
//添加连接密码的key和value
if (password != null) {
props.setProperty("password", password);
}
//利用封装好的配置获取连接
return doGetConnection(props);
}
private Connection doGetConnection(Properties properties) throws SQLException {
//初始化数据库驱动
initializeDriver();
//创建真正的数据库连接
Connection connection = DriverManager.getConnection(url, properties);
//配置数据库连接的autoCommit和隔离级别
configureConnection(connection);
return connection;
}
3.1.3. initializeDriver()
UnpooledDataSource.initializeDriver()
方法主要负责数据驱动的初始化,该方法会创建配置中指定的Driver对象,并将其注册到DriverManger
以及上面介绍的UnpooledDataSource.registerDriver
集合中保存。
/**
* 初始化数据库驱动
* @throws SQLException
*/
private synchronized void initializeDriver() throws SQLException {
//判断驱动注册列表中是否包含我们要连接的数据库驱动,即检测驱动是否已注册
if (!registeredDrivers.containsKey(driver)) {
Class<?> driverType;
try {
//判断是否指定了驱动类的加载器,如果指定了初始化驱动后续操作使用指定的ClassLoader,然后返回不同的驱动类型(ClassLoader不同,就算是同一个java文件,生成的class类型也是不同的。)
if (driverClassLoader != null) {
driverType = Class.forName(driver, true, driverClassLoader);
} else {
//如果没有指定加载器,那么使用默认的驱动类型
driverType = Resources.classForName(driver);
}
// DriverManager requires the driver to be loaded via the system ClassLoader.
// http://www.kfu.com/~nsayer/Java/dyn-jdbc.html
//创建Driver对象
Driver driverInstance = (Driver)driverType.getDeclaredConstructor().newInstance();
//注册驱动,DriverProxy是UnpooledDataSource中的内部类,是Driver的静态代理类
DriverManager.registerDriver(new DriverProxy(driverInstance));
registeredDrivers.put(driver, driverInstance);
} catch (Exception e) {
throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
}
}
}
3.1.4. configureConnection()
UnpooledDataSource.configureConnection()
方法会完成数据连接的一系列配置,具体代码如下所示(defaultNetworkTimeout
这个属性是Mybatis3.5.2
版本加入的):
/**
* 完成数据库连接的一系列配置
* @param conn
* @throws SQLException
*/
private void configureConnection(Connection conn) throws SQLException {
//设置网络超时时间,这是3.5.2之后添加的属性
if (defaultNetworkTimeout != null) {
conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout);
}
//设置事务是否自动提交
if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
conn.setAutoCommit(autoCommit);
}
//设置事务的隔离界别
if (defaultTransactionIsolationLevel != null) {
conn.setTransactionIsolation(defaultTransactionIsolationLevel);
}
}
3.2. PooledDataSource
数据库连接的创建时一个非常耗时的,数据库能够建立的连接数也非常有限,所以在绝大多数系统中,数据库连接是非常珍贵的资源,使用数据库连接池就显得尤为必要了。使用数据库连接池会带来很多的好处,例如,可以实现数据库连接的重用、提高响应速度、防止数据库连接过多造成数据库假死、避免数据库连接泄露等。
数据库连接池在初始化是,一般会创建一定数量的数据库连接并添加到连接池中备用。当程序需要使用数据库连接时,从池中请求连接;当程序不再使用该连接时,会将其返回到池中缓存,等待下次使用,而不是直接关闭。当然,数据库连接线会控制连接总数的上限以及空闲连接数的上限,如果连接池创建的总连接数已达到上限,且都已被占用,则后续请求连接的线程会进入阻塞队列等待,知道有线程释放出可用连接。如果连接池中空闲连接数较多,达到其上限,则后续返回的空闲连接不会放入池中,而是直接关闭,这样可以减少系统维护多余数据库连接的开销。
如果将总连接数的上线设置得过大,可能会因为连接数过多而导致数据库僵死,系统整体性能下降;如果总连接数上线过小,则无法完全发挥数据库的性能,浪费数据库资源。如果将空闲连接的上线设置得过大,则会浪费系统资源来维护这些空闲连接;如果空闲连接上线过小,当出现瞬间的峰值请求时,系统的快速响应能力就比较弱。所以在设置数据库连接池的这两个值时,需要进行性能测试、权衡以及一些经验。
PooledDataSource
实现了简易数据库连接池的功能,它依赖的组件如下图所指示,其中需要注意的是,PooledDataSource
创建新数据库连接的功能是依赖其中封装的UnpooledDataSource
对象实现的。

在研究PooledDataSource
的时候从一个简单的例子出发,不然不知道怎么研究PooledDataSource
、PooledConnection
、PoolState
、UnpooledDataSource
、DataSource
五者之间的调用关系:
package red.reksai.datasource;
import org.apache.ibatis.datasource.pooled.PooledDataSourceFactory;
import org.apache.ibatis.io.Resources;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
/**
* @author : <a href="mailto:gnehcgnaw@gmail.com">gnehcgnaw</a>
* @since : 2019/12/1 01:15
*/
public class PooledDataSourceFactoryTest {
@Test
public void test1() throws IOException, SQLException {
String resources = "resources/config.properties";
InputStream inputStream = Resources.getResourceAsStream(resources);
Properties properties = new Properties();
properties.load(inputStream);
PooledDataSourceFactory pooledDataSourceFactory = new PooledDataSourceFactory();
pooledDataSourceFactory.setProperties(properties);
Connection connection = pooledDataSourceFactory.getDataSource().getConnection();
PreparedStatement preparedStatement = connection.prepareStatement("select * from blog where blog_id = 1");
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()){
System.out.println(resultSet.getString(1));
}
}
}
有上述代码可以发现程序执行的第一步是:new PooledDataSourceFactory()
,而在PooledDataSourceFactory
构造中new
了一个PooledDataSource
,所以我们需要先解析PooledDataSource
。
PooledDataSource
中的核心字段如下所示:
/**
* 通过PoolState管理连接池的转台并记录统计信息
*/
private final PoolState state = new PoolState(this);
/**
* 创建一个PooledDataSource需要一个UnpooledDataSource
*/
private final UnpooledDataSource dataSource;
// OPTIONAL CONFIGURATION FIELDS
// 可选配置字段
/**
* 最大活跃连接数
*/
protected int poolMaximumActiveConnections = 10;
/**
* 最大空闲连接数
*/
protected int poolMaximumIdleConnections = 5;
/**
* 最大CheckoutTime时间(最大连接时间)
*/
protected int poolMaximumCheckoutTime = 20000;
/**
* 在无法获取连接时,线程需要等待的时间
*/
protected int poolTimeToWait = 20000;
protected int poolMaximumLocalBadConnectionTolerance = 3;
/**
* 在检测一个数据库连接是否可用时,会给数据库发送一个测试SQL语句
*/
protected String poolPingQuery = "NO PING QUERY SET";
/**
* 是否允许发送测试SQL
*/
protected boolean poolPingEnabled;
/**
* 当poolPingConnectionsNotUsedFor毫秒未使用时,会发送一次测试SQL语句,检测连接是否正常
*/
protected int poolPingConnectionsNotUsedFor;
/**
* 该hash用于标志着当前的连接池,在构造函数中初始化
* 生成规则:{@link PooledDataSource#assembleConnectionTypeCode(String, String, String)}
*/
private int expectedConnectionTypeCode;
PooledDataSource
中还提供了上述字段的getter
和setter
方法,代码比较简单。其中有个与众不同的属性赋值private final PoolState state = new PoolState(this);
,这使得我先去研究PoolState
了,一会回来。
PoolState
是用于管理PooledConnection
对象状态的组件。为什么这么说呢?看看它定义的字段:
protected PooledDataSource dataSource;
/**
* 空闲的PooledConnection集合
*/
protected final List<PooledConnection> idleConnections = new ArrayList<>();
/**
* 活跃的PooledConnection集合
*/
protected final List<PooledConnection> activeConnections = new ArrayList<>();
/**
* 请求数据库连接的次数
*/
protected long requestCount = 0;
/**
* 获取连接的累计时长
*/
protected long accumulatedRequestTime = 0;
/**
* CheckoutTime表示应用从连接池中取出连接,到归还连接这端时长
* accumulatedCheckoutTime记录的是所有连接累计的CheckoutTime时长
*/
protected long accumulatedCheckoutTime = 0;
/**
* 当连接长时间未被归还给连接池时,会被认为该连接超时
* claimedOverdueConnectionCount 记录的是超时的连接个数
*/
protected long claimedOverdueConnectionCount = 0;
/**
* 累计超时时间
*/
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
/**
* 累计等待时间
*/
protected long accumulatedWaitTime = 0;
/**
* 等待次数
*/
protected long hadToWaitCount = 0;
/**
* 无效连接数
*/
protected long badConnectionCount = 0;
由以上字段可以看出,PoolState
是通过两个Arraylist<PooledConnection>
分别管理空闲状态的连接和活跃状态的连接的,当然了PoolState
中还定义了一系列用于统计的字段。
简单介绍完以上,我们再回到PooledDataSource
,分析其构造方法,如下所示:
public PooledDataSource() {
dataSource = new UnpooledDataSource();
}
public PooledDataSource(UnpooledDataSource dataSource) {
this.dataSource = dataSource;
}
public PooledDataSource(String driver, String url, String username, String password) {
dataSource = new UnpooledDataSource(driver, url, username, password);
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
}
public PooledDataSource(String driver, String url, Properties driverProperties) {
dataSource = new UnpooledDataSource(driver, url, driverProperties);
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
}
public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, String username, String password) {
dataSource = new UnpooledDataSource(driverClassLoader, driver, url, username, password);
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
}
public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) {
dataSource = new UnpooledDataSource(driverClassLoader, driver, url, driverProperties);
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
}
有PooledDataSource
的构造方法可知,PooledDataSource
的创建需要一个UnPooledDataSource
对象。
拿到DataSource
对象之后,就要通过DataSource.getConnection()
去获取数据库连接对象,在PooledDataSource
中的getConnection()
代码如下所示:
/**
* 获取连接:
* 首先通过{@link PooledDataSource#popConnection(String, String)}获取{@link PooledConnection},
* 因为PooledConnection只实现了{@link InvocationHandler} 接口,并未实现java.sql.Connection,故而这个PooledConnection不能使用,
* 需要使用{@link PooledConnection#getProxyConnection()}获取一个JDK动态代理生成的实现了java.sql.Connection的代理对象。
* @return
* @throws SQLException
*/
@Override
public Connection getConnection() throws SQLException {
return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
}
/**
* 获取连接
* @return
* @throws SQLException
*/
@Override
public Connection getConnection(String username, String password) throws SQLException {
return popConnection(username, password).getProxyConnection();
}
由以上代码可以看出,不管调用的是哪个PooledDataSource.getConnection()
方法的重载,最终都调用到了两个方法:
- 通过
PooledDataSource.popConnection()
获取一个PooledConnection
,因为PooledConnection
只实现了InvocationHandler
接口,并未实现java.sql.Connection
,故而这个PooledConnection
是不能操作数据库的,需要通过它获取一个java.sql.Connection
的代理对象,也就有了第二部的操作; - 使用
PooledConnection.getProxyConnection()
,获取一个JDK
动态代理生成的实现了java.sql.Connection
的代理对象。(这一步很简单就是一个get
值的过程,不去研究。)
所以我们接下来先分析PooledDataSource.popConnection()
方法,此方法具体代码如下所示:
/**
* pop连接
* @param username
* @param password
* @return PooledConnection的代理对象
* @throws SQLException
*/
private PooledConnection popConnection(String username, String password) throws SQLException {
//等待,默认是不等待
boolean countedWait = false;
PooledConnection conn = null;
//创建或判断连接之前系统时间
long t = System.currentTimeMillis();
//本地错误连接数
int localBadConnectionCount = 0;
//1. 当连接为null的时候,去执行循环
while (conn == null) {
synchronized (state) {
//2. 判断有没有空闲连接
if (!state.idleConnections.isEmpty()) {
// Pool has available connection
//有空闲连接,就获取连接,然后把当前连接从空闲连接中移除
conn = state.idleConnections.remove(0);
if (log.isDebugEnabled()) {
log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
} else {
// Pool does not have available connection
//如果没有空闲连接
//首先判断活跃连接是不是小于最大活跃数,如果小于可以创建新连接
if (state.activeConnections.size() < poolMaximumActiveConnections) {
// Can create new connection
// 创建一个新连接(这是一个代理对象)
conn = new PooledConnection(dataSource.getConnection(), this);
if (log.isDebugEnabled()) {
log.debug("Created connection " + conn.getRealHashCode() + ".");
}
} else {
// Cannot create new connection
// 如果判断活跃连接数等于最大活跃数,获取最老的活跃连接
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
// 获取此连接的连接时长(当前时间—取出连接的时间)
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
// 然后判断是否超时(此连接的连接时间 和 运行连接的时间 比较)
if (longestCheckoutTime > poolMaximumCheckoutTime) {
// 当前连接超时
// Can claim overdue connection
//对超时连接进行统计
//超时连接数+1
state.claimedOverdueConnectionCount++;
//总累计超时时间 = 原有总累计超时时间+当前连接时间(因为当前连接已经超时)
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
//总累计连接时间 = 原有总累计连接时间+ 当前连接时间
state.accumulatedCheckoutTime += longestCheckoutTime;
//从活跃连接中移除最老的这个超时连接
state.activeConnections.remove(oldestActiveConnection);
//获取真正的数据库连接,判断数据库提交模式(自动提交事务还是手动)
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
try {
//如果是不自动提交事务的情况,那么就要回滚本次操作
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException e) {
/*
Just log a message for debug and continue to execute the following
statement like nothing happened.
Wrap the bad connection with a new PooledConnection, this will help
to not interrupt current executing thread and give current thread a
chance to join the next competition for another valid/good database
connection. At the end of this loop, bad {@link @conn} will be set as null.
*/
log.debug("Bad connection. Could not roll back");
}
}
//重新创建连接
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
//设置该连接创建的时间戳
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
//设置该连接的最后使用时间
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
//然后作废之前的最老的超时连接,因为此前只是从集合中移除,并不表示它不能使用,而这一步就是确保这种情况不会出现。
oldestActiveConnection.invalidate();
if (log.isDebugEnabled()) {
log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
}
} else {
// 没有空闲连接、活跃的连接数又等于限定的最大连接数(即:无法创建连接)而且无超时连接、则只能阻塞等待
// Must wait
try {
//如果此前没有等待的
if (!countedWait) {
//先将等待数+1
state.hadToWaitCount++;
//然后将状态设置为等待状态
countedWait = true;
}
if (log.isDebugEnabled()) {
log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
}
//获取当前系统时间
long wt = System.currentTimeMillis();
//获取需要等待的时间,利用Object.wait(需要等待的时间),让当前线程进行等待
state.wait(poolTimeToWait);
//更新累计等待时间:累计等待时间=当得系统时间+当前时间-等待直接记录的系统时间
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
// 再次判断连接是否为空
// 如果不为空
if (conn != null) {
// ping to server and check the connection is valid or not
//判断连接是否有效
if (conn.isValid()) {
//如果当前连接不是自动提交事务,那就回滚之前操作
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
//然后重现设置用于标识该连接所在的连接池的标识码
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
//设置连接时长
conn.setCheckoutTimestamp(System.currentTimeMillis());
//设置最后修改时间
conn.setLastUsedTimestamp(System.currentTimeMillis());
//把当前连接添加到活跃连接集合中
state.activeConnections.add(conn);
//然后把连接次数+1
state.requestCount++;
//累计请求连接时间
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else {
//如果当前连接不为空,但是是失效的,那么表明此连接是一个坏连接(无效连接)
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
}
//无效连接数+1
state.badConnectionCount++;
//本地错误连接数+1
localBadConnectionCount++;
//设置连接为空
conn = null;
if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Could not get a good connection to the database.");
}
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
//此时连接为空,表明发生了未知错误
if (conn == null) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
//最后返回连接
return conn;
}
以上代码的逻辑流程如下所示:
有以上流程,我们发现在在活跃数没有大于最大限定的时候创建了一个新的PooledConnection
,这里调用了PooledConnection
的构造方法,代码如下所示:
public PooledConnection(Connection connection, PooledDataSource dataSource) {
this.hashCode = connection.hashCode();
this.realConnection = connection;
this.dataSource = dataSource;
this.createdTimestamp = System.currentTimeMillis();
this.lastUsedTimestamp = System.currentTimeMillis();
this.valid = true;
/**
* 这里的this 为 {@link PooledConnection#invoke(Object, Method, Object[])}
*/
this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
}
这里就初始化了一个java.sql.Connection
的代理对象(this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
),后续进行的操作就是通过这个代理对象完成的,因为代理对象的执行最终要执行到InvocartionHandler
的实现的invoke()
方法,而PooledConnection
就实现了InvocationHandler
方法,故我们执行的java.sql.Connection
中的方法最终执行的都是PooledConnection
中的invoke()
方法,PooledConnection.invoke()
方法代码如下所示:
/**
* Required for InvocationHandler implementation.
*
* @param proxy - not used
* @param method - the method to be executed
* @param args - the parameters to be passed to the method
* @see java.lang.reflect.InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
//如果调用的是close()方法,则重现将连接放回到连接池,而不是真正的关闭数据库连接
if (CLOSE.equals(methodName)) {
dataSource.pushConnection(this);
return null;
}
try {
if (!Object.class.equals(method.getDeclaringClass())) {
// issue #579 toString() should never fail
// throw an SQLException instead of a Runtime
//通过valid字段检测数据库连接是否有效
checkConnection();
}
//调用真正数据库连接对象的对应方法
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
以上代码中重要的方法是PooledDataSource.pushConnection()
,PooledDataSource.pushConnection()
的代码如下所示:
/**
* 放回连接
* @param conn
* @throws SQLException
*/
protected void pushConnection(PooledConnection conn) throws SQLException {
synchronized (state) {
//从活跃连接集合中移除此连接
state.activeConnections.remove(conn);
//判断此连接是否有效
if (conn.isValid()) {
//判断空闲连接数是否小于最大空闲连接数 (即:判断空闲连接数是否达到上限) 以及此连接是否是该连接池的连接
if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
//空闲连接数没有达到上限
//累计checkOut时长
state.accumulatedCheckoutTime += conn.getCheckoutTime();
//回滚未提交的事务
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
//为返还连接创造新的PooledConnection对象
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
//然后将新对象添加到活跃集合
state.idleConnections.add(newConn);
//设置新连接创建时间戳
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
//设置新连接最后使用时间戳
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
//将老连接对象设置为无效
conn.invalidate();
if (log.isDebugEnabled()) {
log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
}
//唤醒等待的线程
state.notifyAll();
} else {
//空闲连接数已达到上限 或 PooledConnection对象不属于该连接池
//累计checkOur时长
state.accumulatedCheckoutTime += conn.getCheckoutTime();
//回滚未提交的操作
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
//因为这个PooledConnection对象不属于该连接池,所以直接关闭,而不是放回连接池
conn.getRealConnection().close();
if (log.isDebugEnabled()) {
log.debug("Closed connection " + conn.getRealHashCode() + ".");
}
//最后再将对象设置为无效
conn.invalidate();
}
} else {
//如果此连接是无效连接,抛出异常,并且记录先关统计数据
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
}
//统计无效的PooledConnection对象
state.badConnectionCount++;
}
}
}
以上代码的逻辑流程如下所示:
需要注意的是,PooledDataSouece.pushConnection()
方法和PooledDataSouece.popConnection()
方法中都调用了PooledDataSource.isValid()
方法来检测PooledConnection
的有效性,该方法除了检测PooledDataSource.valid
字段的值,还会调用PooledDataSource.pingConnection()
方法尝试让数据库执行poolPingQuery
字段中记录的测试SQL
语句,从而检测真正的数据库连接对象是否依然可以正常使用。isValid()
方法以及pingConnection()
方法的代码如下所示:
/**
* 检测PooledConnection的有效性
* Method to see if the connection is usable.
*
* @return True if the connection is usable
*/
public boolean isValid() {
//除了检测valid字段外,还要使用PooledDataSource.pingConnection()方法,向数据库发出测试语句来进一步判断
return valid && realConnection != null && dataSource.pingConnection(this);
}
/**
* 用于测试连接
* Method to check to see if a connection is still usable
*
* @param conn - the connection to check
* @return True if the connection is still usable
*/
protected boolean pingConnection(PooledConnection conn) {
boolean result = true;
try {
//检测真正的连接是否已关闭
result = !conn.getRealConnection().isClosed();
} catch (SQLException e) {
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
result = false;
}
if (result) {
//判断要不要发不出测试语句
if (poolPingEnabled) {
//要
//
if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
try {
if (log.isDebugEnabled()) {
log.debug("Testing connection " + conn.getRealHashCode() + " ...");
}
Connection realConn = conn.getRealConnection();
try (Statement statement = realConn.createStatement()) {
statement.executeQuery(poolPingQuery).close();
}
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
result = true;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
}
} catch (Exception e) {
log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
try {
conn.getRealConnection().close();
} catch (Exception e2) {
//ignore
}
result = false;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
}
}
}
}
return result;
}
最后需要注意的是PooledDataSource.forceCloseAll()
,当修改PooledDataSource
的字段是,例如数据库的URL
、用户名
、密码
、autoCommit
配置等,都会调用PooledDataSource.forceCloseAll()
方法将所有的数据库连接都关掉,同时也会将相应的PooledConnection
对象都设置为无效,清空activeConnections
集合和idleConnections
集合。应用系统之后通过PoolDataSource.getConnection()
获取连接时,会按照新的配置重新创建新的数据库连接以及对应的PooledConnection
对象。forceCloseAll()
方法的具体实现如下所示:
/**
* `PooledDataSource.forceCloseAll()`,当修改`PooledDataSource`的字段是,例如数据库的`URL`、`用户名`、`密码`、`autoCommit`配置等,
* 都会调用`PooledDataSource.forceCloseAll()`方法将所有的数据库连接都关掉,同时也会将相应的`PooledConnection`对象都设置为无效,
* 清空`activeConnections`集合和`idleConnections`集合。应用系统之后通过`PoolDataSource.getConnection()`获取连接时,
* 会按照新的配置重新创建新的数据库连接以及对应的`PooledConnection`对象.
* Closes all active and idle connections in the pool.
*/
public void forceCloseAll() {
synchronized (state) {
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
for (int i = state.activeConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.activeConnections.remove(i - 1);
conn.invalidate();
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception e) {
// ignore
}
}
for (int i = state.idleConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.idleConnections.remove(i - 1);
conn.invalidate();
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception e) {
// ignore
}
}
}
if (log.isDebugEnabled()) {
log.debug("PooledDataSource forcefully closed/removed all connections.");
}
}
Mybatis源码分析 基础支持层 Mybatis源码分析 工厂方法模式
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!