MyBatis數據源模塊源碼分析
數據源對象是比較復雜的對象,其創建過程相對比較復雜,對于 MyBatis 創建數據源,具體來講有如下難點:
- MyBatis 不但要能集成第三方的數據源組件,自身也提供了數據源的實現;
- 數據源的初始化參數較多,比較復雜;
在MyBatis中使用了工廠模式來實現數據源的創建,使用代理模式來幫助實現自己的數據源。
一 . MyBatis數據源模塊類結構
MyBatis數據源模塊的代碼全部位于org.apache.ibatis.datasource包下:

數據源模塊類主要結構如下(圖片來自于:https://blog.csdn.net/Zzzzz_xh/article/details/100531968):

DataSourceFactory是工廠的抽象接口:
/**
* 數據源工廠類的抽象接口,它有兩個實現類:UnpooledDataSourceFactory、PooledDataSourceFactory
* @author Clinton Begin
*/
public interface DataSourceFactory {
/**
* 設置數據源屬性
* @param props
*/
void setProperties(Properties props);
/**
* 獲取數據源
* @return
*/
DataSource getDataSource();
}
DataSourceFactory接口擁有UnpooledDataSourceFactory和PooledDataSourceFactory兩個實現類,UnpooledDataSourceFactory是未使用池化技術的數據源工廠類,PooledDataSourceFactory是使用了池化技術的數據源工廠類。它們兩分別用于創建UnpooledDataSource和PooledDataSource,它們都實現了javax.sql.DataSource JDBC提供的數據源標準,其中PooledDataSource就是MyBatis自己實現的數據庫連接池。
二. UnpooledDataSource源碼分析
在JDK中官方定義了一個數據源接口,市面上所有第三方連接池(數據源)都應該實現這個接口:
public interface DataSource extends CommonDataSource, Wrapper {
Connection getConnection() throws SQLException;
Connection getConnection(String username, String password)
throws SQLException;
}
UnpooledDataSource實際上是一個未使用池化技術的數據源,它實現了javax.sql.DataSource數據源標準(JDBC規定),但是在內部調用getConnection()方法時是通過創建Connection對象來實現的,由于每一次獲取都創建新的連接對象,連接對象并沒有進行復用,效率較低。其中UnpooledDataSource源碼如下(只保留關鍵代碼):
public class UnpooledDataSource implements DataSource {
private ClassLoader driverClassLoader;//驅動類的類加載器
private Properties driverProperties;//數據庫連接的相關信息
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();//緩存已注冊的數據庫驅動類
private String driver;
private String url;
private String username;
private String password;
private Boolean autoCommit;
private Integer defaultTransactionIsolationLevel;
private Integer defaultNetworkTimeout;
@Override
public Connection getConnection() throws SQLException {
return doGetConnection(username, password);
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return doGetConnection(username, password);
}
private Connection doGetConnection(Properties properties) throws SQLException {
//初始化驅動
initializeDriver();
//重點在這里,連接對象使用過 DriverManager.getConnection創建的新的連接對象
Connection connection = DriverManager.getConnection(url, properties);
//配置連接
configureConnection(connection);
return connection;
}
...
}
三. PooledDataSource源碼分析
PooledDataSource是Mybatis自己實現的數據庫連接池,在分析它的源碼之前我們首先要清楚作為一個連接池需要實現哪些功能。
作為一個數據庫連接池,其最核心的功能是要做到Connection的復用,當用戶調用連接池的getConnection獲取連接時會在池中去拿,當用戶調用Connection的close()方法時就會將該連接歸還至連接池。而PooledDataSource實現上述功能需要借助另外兩個類來實現:
PoolState:用于保存線程池的相關狀態。PooledConnection:Connection的加強類,用于加強原生close等方法,從而實現數據庫連接的復用。
3.1 PoolState
在PoolState最核心的的是idleConnections和activeConnections,他們分別是存儲空閑連接和非空閑連接的集合(在后文中出于習慣考慮,將其描述為空閑隊列和活動隊列)。
public class PoolState {
protected PooledDataSource dataSource;
//空閑連接隊列
protected final List<PooledConnection> idleConnections = new ArrayList<>();
//活動隊列
protected final List<PooledConnection> activeConnections = new ArrayList<>();
//請求的次數
protected long requestCount = 0;
//累計獲得連接的時長
protected long accumulatedRequestTime = 0;
//累計使用連接的時間。從連接取出到歸還,算一次使用時間
protected long accumulatedCheckoutTime = 0;
//使用連接超時的次數
protected long claimedOverdueConnectionCount = 0;
//累計超時時間
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
//累計等待時間
protected long accumulatedWaitTime = 0;
//等待次數
protected long hadToWaitCount = 0;
//無效的連接次數
protected long badConnectionCount = 0;
public PoolState(PooledDataSource dataSource) {
this.dataSource = dataSource;
}
...
}
3.2 PooledDataSource
PooledDataSource最主要是要理解getConnection方法獲取連接對象的邏輯,這里給出該方法的執行流程圖:

PooledDataSource類獲取連接時的核心源碼如下:
public class PooledDataSource implements DataSource {
//Log是是適配器模式的抽象接口
private static final Log log = LogFactory.getLog(PooledDataSource.class);
//線程池的相關狀態
private final PoolState state = new PoolState(this);
//沒有池化的數據源
private final UnpooledDataSource dataSource;
// OPTIONAL CONFIGURATION FIELDS
//在任意時間可存在的活動(正在使用)連接數量,默認值:10
protected int poolMaximumActiveConnections = 10;
//任意時間可能存在的空閑連接數,默認是5
protected int poolMaximumIdleConnections = 5;
//在被強制返回之前,池中連接被檢出(checked out)時間,默認值:20000 毫秒(即 20 秒)
protected int poolMaximumCheckoutTime = 20000;
//這是一個底層設置,如果獲取連接花費了相當長的時間,連接池會打印狀態日志并重新嘗試獲取一個連接(避免在誤配置的情況下一直失敗且不打印日志),默認值:20000 毫秒(即 20 秒)。
protected int poolTimeToWait = 20000;
//這是一個關于壞連接容忍度的底層設置, 作用于每一個嘗試從緩存池獲取連接的線程。 如果這個線程獲取到的是一個壞的連接,那么這個數據源允許這個線程嘗試重新獲取一個新的連接,但是這個重新嘗試的次數不應該超過
// poolMaximumIdleConnections 與 poolMaximumLocalBadConnectionTolerance 之和。 默認值:3(新增于 3.4.5)
protected int poolMaximumLocalBadConnectionTolerance = 3;
// 發送到數據庫的偵測查詢,用來檢驗連接是否正常工作并準備接受請求。默認是“NO PING QUERY SET”,這會導致多數數據庫驅動出錯時返回恰當的錯誤消息。
protected String poolPingQuery = "NO PING QUERY SET";
//是否啟用偵測查詢。若開啟,需要設置 poolPingQuery 屬性為一個可執行的 SQL 語句(最好是一個速度非常快的 SQL 語句),默認值:false。
protected boolean poolPingEnabled;
// 配置 poolPingQuery 的頻率。可以被設置為和數據庫連接超時時間一樣,來避免不必要的偵測,默認值:0(即所有連接每一時刻都被偵測 — 當然僅當 poolPingEnabled 為 true 時適用)。
protected int poolPingConnectionsNotUsedFor;
//根據數據庫URL、用戶名、密碼生成一個Hash值,唯一標識一個連接池,由這個連接池產生的連接對象都會帶上這個值
private int expectedConnectionTypeCode;
@Override
public Connection getConnection() throws SQLException {
return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return popConnection(username, password).getProxyConnection();
}
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
//此次獲取任務中,獲取到失效連接的次數
int localBadConnectionCount = 0;
//最外面是while死循環,如果一直拿不到connection,則不斷嘗試
while (conn == null) {
//使用state加鎖,也就是說下面代碼對state的操作都是線程安全的
synchronized (state) {
if (!state.idleConnections.isEmpty()) {
//連接池中擁有空閑連接
//拿出空閑隊列中的第一個連接
conn = state.idleConnections.remove(0);
//如果是Debu級別,則輸出日志
if (log.isDebugEnabled()) {
log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
} else {
/**
* 連接池中沒有空閑連接
*/
if (state.activeConnections.size() < poolMaximumActiveConnections) {
//如果當前活動的線程小于所約定的最大活動線程數,則創建一個連接
//創建代理對象
conn = new PooledConnection(dataSource.getConnection(), this);
if (log.isDebugEnabled()) {
log.debug("Created connection " + conn.getRealHashCode() + ".");
}
} else {
// 如果連接池中活動連接數到達極限,則不能創建連接
// 拿去activeConnections隊列中最老的連接對象
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
// 獲取此連接已取出的時間
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
if (longestCheckoutTime > poolMaximumCheckoutTime) {
// 如果此連接從連接池中獲取出來的時間超過限制的最大時間
//將過期的連接數+1
state.claimedOverdueConnectionCount++;
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
state.accumulatedCheckoutTime += longestCheckoutTime;
state.activeConnections.remove(oldestActiveConnection);
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
//如果超時的連接事務不是自動提交的
try {
//回滾事務
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException e) {
/*
Just log a message for debug and continue to execute the following
statement like nothing happened.
Wrap the bad connection with a new PooledConnection, this will help
to not interrupt current executing thread and give current thread a
chance to join the next competition for another valid/good database
connection. At the end of this loop, bad {@link @conn} will be set as null.
*/
log.debug("Bad connection. Could not roll back");
}
}
//重新封裝一個新的代理連接對象(這里有點疑問?新代理對象與老代理對象(PooledConnection)共用目標對象(Connection)不會帶來線程安全問題嗎?)
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
//設置創建時間
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
//設置最后使用的時間
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
//將超時的代理連接對象(PooledConnection)作廢
oldestActiveConnection.invalidate();
if (log.isDebugEnabled()) {
log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
}
} else {
// 如果此連接從連接池中獲取出來的時間沒有超過限制的最大時間,則必須等待
try {
if (!countedWait) {
//等待數量+1
state.hadToWaitCount++;
countedWait = true;
}
if (log.isDebugEnabled()) {
log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
}
long wt = System.currentTimeMillis();
//當前線程釋放掉state鎖,等待poolTimeToWait
state.wait(poolTimeToWait);
//計算累計等待時間
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
//如果捕獲到中斷異常則跳出循環
break;
}
}
}
}
if (conn != null) {
//如果已經拿到連接對象了
if (conn.isValid()) {
//connection是有效的
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
//設置當前connection對象的TypeCode,TypeCode是URL+用戶名+密碼的HashCode,目的是在歸還的時候判斷,當前連接的參數是否與數據源相同
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
state.activeConnections.add(conn);
state.requestCount++;
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else {
//connection是無效的
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
}
//整個數據源累計獲取到無效連接的次數+1
state.badConnectionCount++;
//此次獲取任務中 獲取失效連接的次數+1
localBadConnectionCount++;
conn = null;
if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
//此次連接獲取任務中 獲取失效連接的次數 大于 poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance 則拋出異常,停止嘗試
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Could not get a good connection to the database.");
}
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
if (conn == null) {
//如果經過上面一系列的操作還沒有獲取到對象,則拋出SQLException
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
return conn;
}
}
3.3 PooledConnection
在PooledDataSource中我們理解了Connection獲取流程,當用戶調用close方法時需要將該對象歸還至數據庫,而這一功能需要通過PooledConnection類來實現:
class PooledConnection implements InvocationHandler {
private static final String CLOSE = "close";
private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
private final int hashCode;
//當前連接所屬的數據源,最后會歸還至該數據源
private final PooledDataSource dataSource;
//真正的連接對象
private final Connection realConnection;
//代理的連接對象
private final Connection proxyConnection;
//從數據源中取出來的時間戳
private long checkoutTimestamp;
//連接創建的時間戳
private long createdTimestamp;
//連接最后一次使用的時間戳
private long lastUsedTimestamp;
//根據數據庫URL、用戶名、密碼生成一個Hash值,唯一標識一個連接池
private int connectionTypeCode;
//連接是否有效
private boolean valid;
...
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if (CLOSE.equals(methodName)) {
//如果用戶調用的是close方法,則將調用PooledDataSource中的pushConnection方法
dataSource.pushConnection(this);
return null;
}
try {
if (!Object.class.equals(method.getDeclaringClass())) {
/**
* 如果調用的不是Object的方法,則檢查該連接是否有效,如果無效則拋出異常,
* 這也是PooledDataSource.getConnection當沒有空閑連接,將超時連接作廢機制的關鍵
*/
// issue #579 toString() should never fail
// throw an SQLException instead of a Runtime
checkConnection();
}
//調用被代理的Connection中的方法
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
private void checkConnection() throws SQLException {
if (!valid) {
throw new SQLException("Error accessing PooledConnection. Connection is invalid.");
}
}
}
可以看到通過invoke方法的加強,當用戶調用close()方法時會通過PooledDataSource中的pushConnection()方法歸還連接,連接歸還的流程圖如下:

pushConnection方法源碼如下:
/**
* 將連接對象歸還給連接池(實際上是將連接從active隊列中移到idle隊列中)
*
* @param conn
* @throws SQLException
*/
protected void pushConnection(PooledConnection conn) throws SQLException {
synchronized (state) {
//將當前連接從active隊列中移除
state.activeConnections.remove(conn);
if (conn.isValid()) {
//連接是有效的
if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
//空閑的數量小于最大空閑值 且 連接對象的typeCode與數據源期望的TypeCode相同
//記錄當前連接使用的時間
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
//如果連接對象的事務是非自動提交的,則回滾事務
conn.getRealConnection().rollback();
}
//重新封裝一個新的代理連接對象
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
//將新建的代理連接對象放入空閑隊列
state.idleConnections.add(newConn);
//設置創建的時間戳
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
//設置最后使用的時間戳
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
//老的代理對象作廢
conn.invalidate();
if (log.isDebugEnabled()) {
log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
}
//換新在state鎖上的等待的線程
state.notifyAll();
} else {
//空閑的數量大于等于最大空閑值 或者 連接對象的typeCode與數據源期望的TypeCode不相同
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
//關閉這個連接
conn.getRealConnection().close();
if (log.isDebugEnabled()) {
log.debug("Closed connection " + conn.getRealHashCode() + ".");
}
//將代理連接作廢
conn.invalidate();
}
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
}
state.badConnectionCount++;
}
}
}
詳細代碼注釋請移步至:https://github.com/tianjindong/mybatis-source-annotation

浙公網安備 33010602011771號