DataSource

1. 工厂方法模式

2. DataSourceFactory

在数据源模块中,DataSourceFactory接口扮演工厂接口的角色。UnpooledDataSourceFactoryPooledDataSourceFactory则扮演着具体工厂类的角色。

我们从DataSourceFactory接口开始分析,其定义如下:

  /**
   * 设置DataSource相关属性,
   * 一切紧跟在初始化完成之后,证明:
   *  {@link org.apache.ibatis.builder.xml.XMLConfigBuilder#dataSourceElement(XNode)}
   * @param props
   */
  void setProperties(Properties props);

  /**
   * 获取DataSource对象
   * @return
   */
  DataSource getDataSource();

2.1. UnpooledDataSourceFactory

UnpooledDataSourceFactory的构造方法中会直接创建UnpooledDataSource对象,并初始化UnpooledDatasourceFactory.dataSource字段。

  protected DataSource dataSource;

  /**
   * 直接创建一个{@link UnpooledDataSource}
   */
  public UnpooledDataSourceFactory() {
    this.dataSource = new UnpooledDataSource();
  }

UnpooledDataSourceFactory.setProperties()方法会完成对UnpooledDataSourceFactory对象的配置,代码如下:

@Override
  public void setProperties(Properties properties) {
    Properties driverProperties = new Properties();
    //创建DataSource对应的MetaObject
    MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
    //遍历properties集合,该集合中配置了数据源需要的信息
    for (Object key : properties.keySet()) {
      String propertyName = (String) key;
      //以"diver."开头的配置项是对DataSource的配置,记录到diverProperties中保存
      if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
        String value = properties.getProperty(propertyName);
        driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
      }
      //是否有该属性对应的setter方法
      else if (metaDataSource.hasSetter(propertyName)) {
        String value = (String) properties.get(propertyName);
        //根据属性类型将value的类型进行类型转换,主要是Integer、Long、Boolean三种类型转换
        Object convertedValue = convertValue(metaDataSource, propertyName, value);
        //设置DataSource的相关属性
        metaDataSource.setValue(propertyName, convertedValue);
      } else {
        throw new DataSourceException("Unknown DataSource property: " + propertyName);
      }
    }
    //设置DataSource的driverProperties属性
    if (driverProperties.size() > 0) {
      metaDataSource.setValue("driverProperties", driverProperties);
    }
  }

2.2. PooledDataSourceFactory

PooledDataSourceFactory继承了UnpooledDataSourceFactory,但是并没有覆盖setProperties()getDataSource()方法。两者唯一不同的是PoolDataSoueceFactory的构造函数会将其dataSource字段初始化为PooledDataSource对象。

/**
 * 继承了{@link UnpooledDataSourceFactory},但是并没有覆盖{@link UnpooledDataSourceFactory#setProperties(Properties)}和{@link UnpooledDataSourceFactory#getDataSource()}方法,
 * 唯一不同的是初始化的dataSource是不同的:
 *      {@link UnpooledDataSourceFactory} 初始化了{@link org.apache.ibatis.datasource.unpooled.UnpooledDataSource}
 *      {@link PooledDataSourceFactory} 初始化了{@link PooledDataSource}
 * @author Clinton Begin
 */
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {

  public PooledDataSourceFactory() {
    this.dataSource = new PooledDataSource();
  }

}

2.3. JndiDataSourceFactory

JndiDataSourceFactory是依赖JNDI服务从容器中获取用户配置的DataSource,其逻辑并不复杂,可以参看Tomcat的JNDI相关文档。

3. DataSource

javax.sql.DataSource接口在数据源模块中扮演了产品接口的角色,Mybatis提供了两个DataSource接口的实现类,分别是UnpooledDataSourcePooledDataSource,他们扮演着具体产品类的角色。

3.1. UnpooledDataSource

UnpooledDataSource实现了javax.sql.DataSource接口中定义的getConnection()方法及其重载方法,用于获取数据库连接。每次通过UnpooledDataSource.getConnection()方法获取数据库连接时都会创建一个新连接。UnpooledDataSource中的字段如下,每个字段都有对应的gettersetter方法:

  /**
   * 加载Driver类的类加载器
   */
  private ClassLoader driverClassLoader;
  /**
   * 数据库连接驱动的相关配置
   */
  private Properties driverProperties;
  /**
   * 缓存所有已注册的数据库连接驱动
   */
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();
  /**
   * 数据库连接驱动的名称
   */
  private String driver;
  /**
   * 数据库Url
   */
  private String url;
  /**
   * 用户名
   */
  private String username;
  /**
   * 密码
   */
  private String password;
  /**
   * 是否自动提交
   */
  private Boolean autoCommit;
  /**
   * 事务隔离级别
   */
  private Integer defaultTransactionIsolationLevel;
  /**
   * 默认连接网络超时(3.5.2)
   */
  private Integer defaultNetworkTimeout;

3.1.1. 向DriverManager注册驱动

JDBC为例,我们知道创建数据库连接之前,首先要向DriverManager注册JDBC驱动类,com.mysql.jdbc.Driver中有如下静态代码块:

//com.mysql.jdbc.Driver.java
static {
        try {

//#ifdef JAVA8
            DriverManager.registerDriver(driverInstance, new EmptyDiverAction());
//#else
/*
            DriverManager.registerDriver(driverInstance);
*/

//#endif JAVA8

        } catch (Exception e) {
        }
    }

  public static final JDBCDriver driverInstance = new JDBCDriver();

DriverManager中定义了registerDrivers字段用于记录注册的JDBC驱动,定义如下:

//java.sql.DriverManager.java
// List of registered JDBC drivers
    private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();

  public static synchronized void registerDriver(java.sql.Driver driver,
            DriverAction da)
        throws SQLException {

        /* Register the driver if it has not already been added to our list */
        if(driver != null) {
            registeredDrivers.addIfAbsent(new DriverInfo(driver, da));
        } else {
            // This is for compatibility with the original DriverManager
            throw new NullPointerException();
        }

        println("registerDriver: " + driver);

    }

下面我们回到Mybatis中的UnpooledDataSource的分析,UnpooledDataSource中定义了如下静态代码块,在UnpooledDataSource加载时会通过静态代码块将已在DriverManager中注册的JDBC Driver复制一份到UnpooledDataSource.registeredDriver集合中。

/**
   * 此静态代码块,在当前类加载时将已经在DriverManager中注册的JDBC Driver复制一份到{@link UnpooledDataSource#registeredDrivers}中。
   */
  static {
    //获取到能加载到的所有的JDBC的驱动
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      //添加JDBC驱动
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }

3.1.2. getConnection()

UnpooledDataSource.getConnection()方法:

 /**
   * 获取连接
   * @return
   * @throws SQLException
   */
  @Override
  public Connection getConnection() throws SQLException {
    return doGetConnection(username, password);
  }
  /**
   * 获取连接
   * @return
   * @throws SQLException
   */
  @Override
  public Connection getConnection(String username, String password) throws SQLException {
    return doGetConnection(username, password);
  }

3.1.2. doGetConnection()

UnpooledDataSource.getConnection()方法的所有重载最终会调用UnpooledDataSource.doGetConnection()方法获取数据库连接,具体实现如下:

  private Connection doGetConnection(String username, String password) throws SQLException {
    Properties props = new Properties();
    //添加驱动配置
    if (driverProperties != null) {
      props.putAll(driverProperties);
    }
    //添加连接用户名的key和value
    if (username != null) {
      props.setProperty("user", username);
    }
    //添加连接密码的key和value
    if (password != null) {
      props.setProperty("password", password);
    }
    //利用封装好的配置获取连接
    return doGetConnection(props);
  }
  private Connection doGetConnection(Properties properties) throws SQLException {
    //初始化数据库驱动
    initializeDriver();
    //创建真正的数据库连接
    Connection connection = DriverManager.getConnection(url, properties);
    //配置数据库连接的autoCommit和隔离级别
    configureConnection(connection);
    return connection;
  }

3.1.3. initializeDriver()

UnpooledDataSource.initializeDriver()方法主要负责数据驱动的初始化,该方法会创建配置中指定的Driver对象,并将其注册到DriverManger以及上面介绍的UnpooledDataSource.registerDriver集合中保存。

/**
   * 初始化数据库驱动
   * @throws SQLException
   */
  private synchronized void initializeDriver() throws SQLException {
    //判断驱动注册列表中是否包含我们要连接的数据库驱动,即检测驱动是否已注册
    if (!registeredDrivers.containsKey(driver)) {
      Class<?> driverType;
      try {
        //判断是否指定了驱动类的加载器,如果指定了初始化驱动后续操作使用指定的ClassLoader,然后返回不同的驱动类型(ClassLoader不同,就算是同一个java文件,生成的class类型也是不同的。)
        if (driverClassLoader != null) {
          driverType = Class.forName(driver, true, driverClassLoader);
        } else {
          //如果没有指定加载器,那么使用默认的驱动类型
          driverType = Resources.classForName(driver);
        }
        // DriverManager requires the driver to be loaded via the system ClassLoader.
        // http://www.kfu.com/~nsayer/Java/dyn-jdbc.html
        //创建Driver对象
        Driver driverInstance = (Driver)driverType.getDeclaredConstructor().newInstance();
        //注册驱动,DriverProxy是UnpooledDataSource中的内部类,是Driver的静态代理类
        DriverManager.registerDriver(new DriverProxy(driverInstance));
        registeredDrivers.put(driver, driverInstance);
      } catch (Exception e) {
        throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
      }
    }
  }

3.1.4. configureConnection()

UnpooledDataSource.configureConnection()方法会完成数据连接的一系列配置,具体代码如下所示(defaultNetworkTimeout这个属性是Mybatis3.5.2版本加入的):

  /**
   * 完成数据库连接的一系列配置
   * @param conn
   * @throws SQLException
   */
  private void configureConnection(Connection conn) throws SQLException {
    //设置网络超时时间,这是3.5.2之后添加的属性
    if (defaultNetworkTimeout != null) {
      conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout);
    }
    //设置事务是否自动提交
    if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
      conn.setAutoCommit(autoCommit);
    }
    //设置事务的隔离界别
    if (defaultTransactionIsolationLevel != null) {
      conn.setTransactionIsolation(defaultTransactionIsolationLevel);
    }
  }

3.2. PooledDataSource

数据库连接的创建时一个非常耗时的,数据库能够建立的连接数也非常有限,所以在绝大多数系统中,数据库连接是非常珍贵的资源,使用数据库连接池就显得尤为必要了。使用数据库连接池会带来很多的好处,例如,可以实现数据库连接的重用、提高响应速度、防止数据库连接过多造成数据库假死、避免数据库连接泄露等。

数据库连接池在初始化是,一般会创建一定数量的数据库连接并添加到连接池中备用。当程序需要使用数据库连接时,从池中请求连接;当程序不再使用该连接时,会将其返回到池中缓存,等待下次使用,而不是直接关闭。当然,数据库连接线会控制连接总数的上限以及空闲连接数的上限,如果连接池创建的总连接数已达到上限,且都已被占用,则后续请求连接的线程会进入阻塞队列等待,知道有线程释放出可用连接。如果连接池中空闲连接数较多,达到其上限,则后续返回的空闲连接不会放入池中,而是直接关闭,这样可以减少系统维护多余数据库连接的开销。

如果将总连接数的上线设置得过大,可能会因为连接数过多而导致数据库僵死,系统整体性能下降;如果总连接数上线过小,则无法完全发挥数据库的性能,浪费数据库资源。如果将空闲连接的上线设置得过大,则会浪费系统资源来维护这些空闲连接;如果空闲连接上线过小,当出现瞬间的峰值请求时,系统的快速响应能力就比较弱。所以在设置数据库连接池的这两个值时,需要进行性能测试、权衡以及一些经验。

PooledDataSource实现了简易数据库连接池的功能,它依赖的组件如下图所指示,其中需要注意的是,PooledDataSource创建新数据库连接的功能是依赖其中封装的UnpooledDataSource对象实现的。

在研究PooledDataSource的时候从一个简单的例子出发,不然不知道怎么研究PooledDataSourcePooledConnectionPoolStateUnpooledDataSourceDataSource五者之间的调用关系:

package red.reksai.datasource;

import org.apache.ibatis.datasource.pooled.PooledDataSourceFactory;
import org.apache.ibatis.io.Resources;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;

/**
 * @author : <a href="mailto:gnehcgnaw@gmail.com">gnehcgnaw</a>
 * @since : 2019/12/1 01:15
 */
public class PooledDataSourceFactoryTest {
  @Test
  public void test1() throws IOException, SQLException {
    String resources = "resources/config.properties";
    InputStream inputStream = Resources.getResourceAsStream(resources);
    Properties properties = new Properties();
    properties.load(inputStream);
    PooledDataSourceFactory pooledDataSourceFactory = new PooledDataSourceFactory();
    pooledDataSourceFactory.setProperties(properties);
    Connection connection = pooledDataSourceFactory.getDataSource().getConnection();
    PreparedStatement preparedStatement = connection.prepareStatement("select * from blog where blog_id = 1");
    ResultSet resultSet = preparedStatement.executeQuery();
    while (resultSet.next()){
      System.out.println(resultSet.getString(1));
    }
  }
}

有上述代码可以发现程序执行的第一步是:new PooledDataSourceFactory(),而在PooledDataSourceFactory构造中new 了一个PooledDataSource,所以我们需要先解析PooledDataSource

PooledDataSource中的核心字段如下所示:

/**
   * 通过PoolState管理连接池的转台并记录统计信息
   */
  private final PoolState state = new PoolState(this);

  /**
   * 创建一个PooledDataSource需要一个UnpooledDataSource
   */
  private final UnpooledDataSource dataSource;

  // OPTIONAL CONFIGURATION FIELDS
  // 可选配置字段
  /**
   * 最大活跃连接数
   */
  protected int poolMaximumActiveConnections = 10;
  /**
   * 最大空闲连接数
   */
  protected int poolMaximumIdleConnections = 5;
  /**
   * 最大CheckoutTime时间(最大连接时间)
   */
  protected int poolMaximumCheckoutTime = 20000;
  /**
   * 在无法获取连接时,线程需要等待的时间
   */
  protected int poolTimeToWait = 20000;
  protected int poolMaximumLocalBadConnectionTolerance = 3;
  /**
   * 在检测一个数据库连接是否可用时,会给数据库发送一个测试SQL语句
   */
  protected String poolPingQuery = "NO PING QUERY SET";
  /**
   * 是否允许发送测试SQL
   */
  protected boolean poolPingEnabled;
  /**
   * 当poolPingConnectionsNotUsedFor毫秒未使用时,会发送一次测试SQL语句,检测连接是否正常
   */
  protected int poolPingConnectionsNotUsedFor;
  /**
   * 该hash用于标志着当前的连接池,在构造函数中初始化
   *    生成规则:{@link PooledDataSource#assembleConnectionTypeCode(String, String, String)}
   */
  private int expectedConnectionTypeCode;

PooledDataSource中还提供了上述字段的gettersetter方法,代码比较简单。其中有个与众不同的属性赋值private final PoolState state = new PoolState(this);,这使得我先去研究PoolState了,一会回来。


PoolState是用于管理PooledConnection对象状态的组件。为什么这么说呢?看看它定义的字段:

 protected PooledDataSource dataSource;
  /**
   * 空闲的PooledConnection集合
   */
  protected final List<PooledConnection> idleConnections = new ArrayList<>();
  /**
   * 活跃的PooledConnection集合
   */
  protected final List<PooledConnection> activeConnections = new ArrayList<>();
  /**
   * 请求数据库连接的次数
   */
  protected long requestCount = 0;
  /**
   * 获取连接的累计时长
   */
  protected long accumulatedRequestTime = 0;
  /**
   * CheckoutTime表示应用从连接池中取出连接,到归还连接这端时长
   * accumulatedCheckoutTime记录的是所有连接累计的CheckoutTime时长
   */
  protected long accumulatedCheckoutTime = 0;
  /**
   * 当连接长时间未被归还给连接池时,会被认为该连接超时
   * claimedOverdueConnectionCount 记录的是超时的连接个数
   */
  protected long claimedOverdueConnectionCount = 0;
  /**
   * 累计超时时间
   */
  protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
  /**
   * 累计等待时间
   */
  protected long accumulatedWaitTime = 0;
  /**
   * 等待次数
   */
  protected long hadToWaitCount = 0;
  /**
   * 无效连接数
   */
  protected long badConnectionCount = 0;

由以上字段可以看出,PoolState是通过两个Arraylist<PooledConnection>分别管理空闲状态的连接和活跃状态的连接的,当然了PoolState中还定义了一系列用于统计的字段。


简单介绍完以上,我们再回到PooledDataSource,分析其构造方法,如下所示:

public PooledDataSource() {
    dataSource = new UnpooledDataSource();
  }

  public PooledDataSource(UnpooledDataSource dataSource) {
    this.dataSource = dataSource;
  }

  public PooledDataSource(String driver, String url, String username, String password) {
    dataSource = new UnpooledDataSource(driver, url, username, password);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

  public PooledDataSource(String driver, String url, Properties driverProperties) {
    dataSource = new UnpooledDataSource(driver, url, driverProperties);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

  public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, String username, String password) {
    dataSource = new UnpooledDataSource(driverClassLoader, driver, url, username, password);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

  public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) {
    dataSource = new UnpooledDataSource(driverClassLoader, driver, url, driverProperties);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

PooledDataSource的构造方法可知,PooledDataSource的创建需要一个UnPooledDataSource对象。

拿到DataSource对象之后,就要通过DataSource.getConnection()去获取数据库连接对象,在PooledDataSource中的getConnection()代码如下所示:

  /**
   * 获取连接:
   *    首先通过{@link PooledDataSource#popConnection(String, String)}获取{@link PooledConnection},
   *    因为PooledConnection只实现了{@link InvocationHandler} 接口,并未实现java.sql.Connection,故而这个PooledConnection不能使用,
   *    需要使用{@link PooledConnection#getProxyConnection()}获取一个JDK动态代理生成的实现了java.sql.Connection的代理对象。
   * @return
   * @throws SQLException
   */
  @Override
  public Connection getConnection() throws SQLException {
    return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
  }
  /**
   * 获取连接
   * @return
   * @throws SQLException
   */
  @Override
  public Connection getConnection(String username, String password) throws SQLException {
    return popConnection(username, password).getProxyConnection();
  }

由以上代码可以看出,不管调用的是哪个PooledDataSource.getConnection()方法的重载,最终都调用到了两个方法:

  1. 通过PooledDataSource.popConnection()获取一个PooledConnection,因为PooledConnection只实现了InvocationHandler接口,并未实现java.sql.Connection,故而这个PooledConnection是不能操作数据库的,需要通过它获取一个java.sql.Connection的代理对象,也就有了第二部的操作;
  2. 使用PooledConnection.getProxyConnection(),获取一个JDK动态代理生成的实现了java.sql.Connection的代理对象。(这一步很简单就是一个get值的过程,不去研究。)

所以我们接下来先分析PooledDataSource.popConnection()方法,此方法具体代码如下所示:

/**
   * pop连接
   * @param username
   * @param password
   * @return   PooledConnection的代理对象
   * @throws SQLException
   */
  private PooledConnection popConnection(String username, String password) throws SQLException {
    //等待,默认是不等待
    boolean countedWait = false;
    PooledConnection conn = null;
    //创建或判断连接之前系统时间
    long t = System.currentTimeMillis();
    //本地错误连接数
    int localBadConnectionCount = 0;
    //1. 当连接为null的时候,去执行循环
    while (conn == null) {
      synchronized (state) {
        //2. 判断有没有空闲连接
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection
          //有空闲连接,就获取连接,然后把当前连接从空闲连接中移除
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // Pool does not have available connection
          //如果没有空闲连接
          //首先判断活跃连接是不是小于最大活跃数,如果小于可以创建新连接
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // Can create new connection
            // 创建一个新连接(这是一个代理对象)
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // Cannot create new connection
            // 如果判断活跃连接数等于最大活跃数,获取最老的活跃连接
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            // 获取此连接的连接时长(当前时间—取出连接的时间)
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            // 然后判断是否超时(此连接的连接时间  和 运行连接的时间  比较)
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // 当前连接超时
              // Can claim overdue connection
              //对超时连接进行统计

              //超时连接数+1
              state.claimedOverdueConnectionCount++;
              //总累计超时时间 = 原有总累计超时时间+当前连接时间(因为当前连接已经超时)
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              //总累计连接时间 = 原有总累计连接时间+ 当前连接时间
              state.accumulatedCheckoutTime += longestCheckoutTime;
              //从活跃连接中移除最老的这个超时连接
              state.activeConnections.remove(oldestActiveConnection);
              //获取真正的数据库连接,判断数据库提交模式(自动提交事务还是手动)
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  //如果是不自动提交事务的情况,那么就要回滚本次操作
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happened.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not interrupt current executing thread and give current thread a
                     chance to join the next competition for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              //重新创建连接
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              //设置该连接创建的时间戳
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              //设置该连接的最后使用时间
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              //然后作废之前的最老的超时连接,因为此前只是从集合中移除,并不表示它不能使用,而这一步就是确保这种情况不会出现。
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // 没有空闲连接、活跃的连接数又等于限定的最大连接数(即:无法创建连接)而且无超时连接、则只能阻塞等待
              // Must wait
              try {
                //如果此前没有等待的
                if (!countedWait) {
                  //先将等待数+1
                  state.hadToWaitCount++;
                  //然后将状态设置为等待状态
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                //获取当前系统时间
                long wt = System.currentTimeMillis();
                //获取需要等待的时间,利用Object.wait(需要等待的时间),让当前线程进行等待
                state.wait(poolTimeToWait);
                //更新累计等待时间:累计等待时间=当得系统时间+当前时间-等待直接记录的系统时间
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        // 再次判断连接是否为空
        // 如果不为空
        if (conn != null) {
          // ping to server and check the connection is valid or not
          //判断连接是否有效
          if (conn.isValid()) {
            //如果当前连接不是自动提交事务,那就回滚之前操作
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            //然后重现设置用于标识该连接所在的连接池的标识码
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            //设置连接时长
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            //设置最后修改时间
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            //把当前连接添加到活跃连接集合中
            state.activeConnections.add(conn);
            //然后把连接次数+1
            state.requestCount++;
            //累计请求连接时间
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            //如果当前连接不为空,但是是失效的,那么表明此连接是一个坏连接(无效连接)
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            //无效连接数+1
            state.badConnectionCount++;
            //本地错误连接数+1
            localBadConnectionCount++;
            //设置连接为空
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }
    //此时连接为空,表明发生了未知错误
    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }
    //最后返回连接
    return conn;
  }

以上代码的逻辑流程如下所示:

有以上流程,我们发现在在活跃数没有大于最大限定的时候创建了一个新的PooledConnection,这里调用了PooledConnection的构造方法,代码如下所示:

 public PooledConnection(Connection connection, PooledDataSource dataSource) {
    this.hashCode = connection.hashCode();
    this.realConnection = connection;
    this.dataSource = dataSource;
    this.createdTimestamp = System.currentTimeMillis();
    this.lastUsedTimestamp = System.currentTimeMillis();
    this.valid = true;
    /**
     * 这里的this 为 {@link PooledConnection#invoke(Object, Method, Object[])}
     */
    this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
  }

这里就初始化了一个java.sql.Connection的代理对象(this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);),后续进行的操作就是通过这个代理对象完成的,因为代理对象的执行最终要执行到InvocartionHandler的实现的invoke()方法,而PooledConnection就实现了InvocationHandler方法,故我们执行的java.sql.Connection中的方法最终执行的都是PooledConnection中的invoke()方法,PooledConnection.invoke()方法代码如下所示:

/**
   * Required for InvocationHandler implementation.
   *
   * @param proxy  - not used
   * @param method - the method to be executed
   * @param args   - the parameters to be passed to the method
   * @see java.lang.reflect.InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])
   */
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    //如果调用的是close()方法,则重现将连接放回到连接池,而不是真正的关闭数据库连接
    if (CLOSE.equals(methodName)) {
      dataSource.pushConnection(this);
      return null;
    }
    try {
      if (!Object.class.equals(method.getDeclaringClass())) {
        // issue #579 toString() should never fail
        // throw an SQLException instead of a Runtime
        //通过valid字段检测数据库连接是否有效
        checkConnection();
      }
      //调用真正数据库连接对象的对应方法
      return method.invoke(realConnection, args);
    } catch (Throwable t) {
      throw ExceptionUtil.unwrapThrowable(t);
    }

  }

以上代码中重要的方法是PooledDataSource.pushConnection()PooledDataSource.pushConnection()的代码如下所示:

 /**
   * 放回连接
   * @param conn
   * @throws SQLException
   */
  protected void pushConnection(PooledConnection conn) throws SQLException {

    synchronized (state) {
      //从活跃连接集合中移除此连接
      state.activeConnections.remove(conn);
      //判断此连接是否有效
      if (conn.isValid()) {
        //判断空闲连接数是否小于最大空闲连接数  (即:判断空闲连接数是否达到上限) 以及此连接是否是该连接池的连接
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          //空闲连接数没有达到上限
          //累计checkOut时长
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          //回滚未提交的事务
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          //为返还连接创造新的PooledConnection对象
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          //然后将新对象添加到活跃集合
          state.idleConnections.add(newConn);
          //设置新连接创建时间戳
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          //设置新连接最后使用时间戳
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          //将老连接对象设置为无效
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          //唤醒等待的线程
          state.notifyAll();
        } else {
          //空闲连接数已达到上限   或   PooledConnection对象不属于该连接池
          //累计checkOur时长
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          //回滚未提交的操作
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          //因为这个PooledConnection对象不属于该连接池,所以直接关闭,而不是放回连接池
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          //最后再将对象设置为无效
          conn.invalidate();
        }
      } else {
        //如果此连接是无效连接,抛出异常,并且记录先关统计数据
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        //统计无效的PooledConnection对象
        state.badConnectionCount++;
      }
    }
  }

以上代码的逻辑流程如下所示:


需要注意的是,PooledDataSouece.pushConnection()方法和PooledDataSouece.popConnection()方法中都调用了PooledDataSource.isValid()方法来检测PooledConnection的有效性,该方法除了检测PooledDataSource.valid字段的值,还会调用PooledDataSource.pingConnection()方法尝试让数据库执行poolPingQuery字段中记录的测试SQL语句,从而检测真正的数据库连接对象是否依然可以正常使用。isValid()方法以及pingConnection()方法的代码如下所示:

  /**
   * 检测PooledConnection的有效性
   * Method to see if the connection is usable.
   *
   * @return True if the connection is usable
   */
  public boolean isValid() {
    //除了检测valid字段外,还要使用PooledDataSource.pingConnection()方法,向数据库发出测试语句来进一步判断
    return valid && realConnection != null && dataSource.pingConnection(this);
  }
/**
   * 用于测试连接
   * Method to check to see if a connection is still usable
   *
   * @param conn - the connection to check
   * @return True if the connection is still usable
   */
  protected boolean pingConnection(PooledConnection conn) {
    boolean result = true;

    try {
      //检测真正的连接是否已关闭
      result = !conn.getRealConnection().isClosed();
    } catch (SQLException e) {
      if (log.isDebugEnabled()) {
        log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
      }
      result = false;
    }

    if (result) {
      //判断要不要发不出测试语句
      if (poolPingEnabled) {
        //要
        //
        if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
          try {
            if (log.isDebugEnabled()) {
              log.debug("Testing connection " + conn.getRealHashCode() + " ...");
            }
            Connection realConn = conn.getRealConnection();
            try (Statement statement = realConn.createStatement()) {
              statement.executeQuery(poolPingQuery).close();
            }
            if (!realConn.getAutoCommit()) {
              realConn.rollback();
            }
            result = true;
            if (log.isDebugEnabled()) {
              log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
            }
          } catch (Exception e) {
            log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
            try {
              conn.getRealConnection().close();
            } catch (Exception e2) {
              //ignore
            }
            result = false;
            if (log.isDebugEnabled()) {
              log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
            }
          }
        }
      }
    }
    return result;
  }

最后需要注意的是PooledDataSource.forceCloseAll(),当修改PooledDataSource的字段是,例如数据库的URL用户名密码autoCommit配置等,都会调用PooledDataSource.forceCloseAll()方法将所有的数据库连接都关掉,同时也会将相应的PooledConnection对象都设置为无效,清空activeConnections集合和idleConnections集合。应用系统之后通过PoolDataSource.getConnection()获取连接时,会按照新的配置重新创建新的数据库连接以及对应的PooledConnection对象。forceCloseAll()方法的具体实现如下所示:

/**
   * `PooledDataSource.forceCloseAll()`,当修改`PooledDataSource`的字段是,例如数据库的`URL`、`用户名`、`密码`、`autoCommit`配置等,
   * 都会调用`PooledDataSource.forceCloseAll()`方法将所有的数据库连接都关掉,同时也会将相应的`PooledConnection`对象都设置为无效,
   * 清空`activeConnections`集合和`idleConnections`集合。应用系统之后通过`PoolDataSource.getConnection()`获取连接时,
   * 会按照新的配置重新创建新的数据库连接以及对应的`PooledConnection`对象.
   * Closes all active and idle connections in the pool.
   */
  public void forceCloseAll() {
    synchronized (state) {
      expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      for (int i = state.activeConnections.size(); i > 0; i--) {
        try {
          PooledConnection conn = state.activeConnections.remove(i - 1);
          conn.invalidate();

          Connection realConn = conn.getRealConnection();
          if (!realConn.getAutoCommit()) {
            realConn.rollback();
          }
          realConn.close();
        } catch (Exception e) {
          // ignore
        }
      }
      for (int i = state.idleConnections.size(); i > 0; i--) {
        try {
          PooledConnection conn = state.idleConnections.remove(i - 1);
          conn.invalidate();

          Connection realConn = conn.getRealConnection();
          if (!realConn.getAutoCommit()) {
            realConn.rollback();
          }
          realConn.close();
        } catch (Exception e) {
          // ignore
        }
      }
    }
    if (log.isDebugEnabled()) {
      log.debug("PooledDataSource forcefully closed/removed all connections.");
    }
  }


Mybatis源码分析   基础支持层      Mybatis源码分析 工厂方法模式

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!