code-learning/mybatis/21-mybatis-SQL 执行(一)之 Executor.md

73 KiB
Raw Permalink Blame History

精尽 MyBatis 源码分析 —— SQL 执行(一)之 Executor

1. 概述

从本文开始,我们来分享 SQL 执行的流程。在 《精尽 MyBatis 源码分析 —— 项目结构一览》 中,我们简单介绍这个流程如下:

对应 executorcursor 模块。前者对应执行器,后者对应执行结果的游标

SQL 语句的执行涉及多个组件 ,其中比较重要的是 Executor、StatementHandler、ParameterHandler 和 ResultSetHandler 。

  • Executor 主要负责维护一级缓存和二级缓存,并提供事务管理的相关操作,它会将数据库相关操作委托给 StatementHandler完成。
  • StatementHandler 首先通过 ParameterHandler 完成 SQL 语句的实参绑定,然后通过 java.sql.Statement 对象执行 SQL 语句并得到结果集,最后通过 ResultSetHandler 完成结果集的映射,得到结果对象并返回。

整体过程如下图:

![整体过程](21-mybatis-SQL 执行(一)之 Executor.assets/05.png)整体过程

下面,我们在看看 executor 包下的列情况,如下图所示:![executor 包](21-mybatis-SQL 执行(一)之 Executor.assets/01.png)executor

  • 正如该包下的分包情况,每个包对应一个功能。

  • statement 包,实现向数据库发起 SQL 命令。

  • parameter
    

    包,实现设置 PreparedStatement 的占位符参数。

  • keygen 包,实现数据库主键生成( 获得 )的功能。

  • resultset 包,实现 ResultSet 结果集的处理,将其映射成对应的结果对象。

  • result 包,结果的处理,被 resultset 包所调用。可能胖友会好奇为啥会有 resultsetresult 两个“重叠”的包。答案见 《精尽 MyBatis 源码分析 —— SQL 执行(四)之 ResultSetHandler》

  • loader 包,实现延迟加载的功能。

  • 根目录Executor 接口及其实现类,作为 SQL 执行的核心入口。

考虑到整个 executor 包的代码量近 5000 行,所以我们将每一个子包,作为一篇文章,逐包解析。所以,本文我们先来分享 根目录,也就是 Executor 接口及其实现类。

2. Executor

org.apache.ibatis.executor.Executor ,执行器接口。代码如下:

// Executor.java

public interface Executor {

    // 空 ResultHandler 对象的枚举
    ResultHandler NO_RESULT_HANDLER = null;

    // 更新 or 插入 or 删除,由传入的 MappedStatement 的 SQL 所决定
    int update(MappedStatement ms, Object parameter) throws SQLException;

    // 查询,带 ResultHandler + CacheKey + BoundSql
    <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;
    // 查询,带 ResultHandler
    <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;
    // 查询,返回值为 Cursor
    <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException;

    // 刷入批处理语句
    List<BatchResult> flushStatements() throws SQLException;

    // 提交事务
    void commit(boolean required) throws SQLException;
    // 回滚事务
    void rollback(boolean required) throws SQLException;

    // 创建 CacheKey 对象
    CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);
    // 判断是否缓存
    boolean isCached(MappedStatement ms, CacheKey key);
    // 清除本地缓存
    void clearLocalCache();

    // 延迟加载
    void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);

    // 获得事务
    Transaction getTransaction();
    // 关闭事务
    void close(boolean forceRollback);
    // 判断事务是否关闭
    boolean isClosed();

    // 设置包装的 Executor 对象
    void setExecutorWrapper(Executor executor);

}
  • 读和写操作相关的方法
  • 事务相关的方法
  • 缓存相关的方法
  • 设置延迟加载的方法
  • 设置包装的 Executor 对象的方法

Executor 的实现类如下图所示:![Executor 类图](21-mybatis-SQL 执行(一)之 Executor.assets/02.png)Executor 类图

  • 我们可以看到Executor 的直接子类有 BaseExecutor 和 CachingExecutor 两个。
  • 实际上CachingExecutor 在 BaseExecutor 的基础上,实现二级缓存功能。
  • 在下文中BaseExecutor 的本地缓存,就是一级缓存。

下面,我们按照先看 BaseExecutor 侧的实现类的源码解析,再看 CachingExecutor 的。

3. BaseExecutor

org.apache.ibatis.executor.BaseExecutor ,实现 Executor 接口,提供骨架方法,从而使子类只要实现指定的几个抽象方法即可。

3.1 构造方法

// BaseExecutor.java

/**
 * 事务对象
 */
protected Transaction transaction;
/**
 * 包装的 Executor 对象
 */
protected Executor wrapper;

/**
 * DeferredLoad( 延迟加载 ) 队列
 */
protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
/**
 * 本地缓存,即一级缓存
 */
protected PerpetualCache localCache;
/**
 * 本地输出类型的参数的缓存
 */
protected PerpetualCache localOutputParameterCache;
protected Configuration configuration;

/**
 * 记录嵌套查询的层级
 */
protected int queryStack;
/**
 * 是否关闭
 */
private boolean closed;

protected BaseExecutor(Configuration configuration, Transaction transaction) {
    this.transaction = transaction;
    this.deferredLoads = new ConcurrentLinkedQueue<>();
    this.localCache = new PerpetualCache("LocalCache");
    this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
    this.closed = false;
    this.configuration = configuration;
    this.wrapper = this; // 自己
}
  • 和延迟加载相关,后续文章,详细解析。详细解析,见

    《精尽 MyBatis 源码分析 —— SQL 执行(五)之延迟加载》

    • queryStack 属性,记录递归嵌套查询的层级。
    • deferredLoads 属性DeferredLoad( 延迟加载 ) 队列。
  • wrapper 属性,在构造方法中,初始化为 this ,即自己。

  • localCache 属性,本地缓存,即一级缓存。那什么是一级缓存呢?

    基于 《MyBatis 的一级缓存实现详解及使用注意事项》 进行修改

    每当我们使用 MyBatis 开启一次和数据库的会话MyBatis 会创建出一个 SqlSession 对象表示一次数据库会话,而每个 SqlSession 都会创建一个 Executor 对象

    在对数据库的一次会话中,我们有可能会反复地执行完全相同的查询语句,如果不采取一些措施的话,每一次查询都会查询一次数据库,而我们在极短的时间内做了完全相同的查询,那么它们的结果极有可能完全相同,由于查询一次数据库的代价很大,这有可能造成很大的资源浪费。

    为了解决这一问题减少资源的浪费MyBatis 会在表示会话的SqlSession 对象中建立一个简单的缓存,将每次查询到的结果结果缓存起来,当下次查询的时候,如果判断先前有个完全一样的查询,会直接从缓存中直接将结果取出,返回给用户,不需要再进行一次数据库查询了。😈 注意,这个“简单的缓存”就是一级缓存,且默认开启,无法关闭

    如下图所示MyBatis 会在一次会话的表示 —— 一个 SqlSession 对象中创建一个本地缓存( localCache ),对于每一次查询,都会尝试根据查询的条件去本地缓存中查找是否在缓存中,如果在缓存中,就直接从缓存中取出,然后返回给用户;否则,从数据库读取数据,将查询结果存入缓存并返回给用户。

    ![整体过程](21-mybatis-SQL 执行(一)之 Executor.assets/03.png)整体过程

    • 关于这段话,胖友要理解 SqlSession 和 Executor 和一级缓存的关系。
    • 😈 另外,下文,我们还会介绍二级缓存是什么。
  • transaction 属性,事务对象。该属性,是通过构造方法传入。为什么呢?待我们看 org.apache.ibatis.session.session 包。

3.2 clearLocalCache

#clearLocalCache() 方法,清理一级(本地)缓存。代码如下:

// BaseExecutor.java

@Override
public void clearLocalCache() {
    if (!closed) {
        // 清理 localCache
        localCache.clear();
        // 清理 localOutputParameterCache
        localOutputParameterCache.clear();
    }
}

3.3 createCacheKey

#createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) 方法,创建 CacheKey 对象。代码如下:

// BaseExecutor.java

@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    // <1> 创建 CacheKey 对象
    CacheKey cacheKey = new CacheKey();
    // <2> 设置 id、offset、limit、sql 到 CacheKey 对象中
    cacheKey.update(ms.getId());
    cacheKey.update(rowBounds.getOffset());
    cacheKey.update(rowBounds.getLimit());
    cacheKey.update(boundSql.getSql());
    // <3> 设置 ParameterMapping 数组的元素对应的每个 value 到 CacheKey 对象中
    List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
    TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
    // mimic DefaultParameterHandler logic 这块逻辑,和 DefaultParameterHandler 获取 value 是一致的。
    for (ParameterMapping parameterMapping : parameterMappings) {
        if (parameterMapping.getMode() != ParameterMode.OUT) {
            Object value;
            String propertyName = parameterMapping.getProperty();
            if (boundSql.hasAdditionalParameter(propertyName)) {
                value = boundSql.getAdditionalParameter(propertyName);
            } else if (parameterObject == null) {
                value = null;
            } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
                value = parameterObject;
            } else {
                MetaObject metaObject = configuration.newMetaObject(parameterObject);
                value = metaObject.getValue(propertyName);
            }
            cacheKey.update(value);
        }
    }
    // <4> 设置 Environment.id 到 CacheKey 对象中
    if (configuration.getEnvironment() != null) {
        // issue #176
        cacheKey.update(configuration.getEnvironment().getId());
    }
    return cacheKey;
}
  • <1> 处,创建 CacheKey 对象。关于 CacheKey 类,在 《精尽 MyBatis 源码分析 —— 缓存模块》 已经详细解析。
  • <2> 处,设置 idoffsetlimitsql 到 CacheKey 对象中。
  • <3> 处,设置 ParameterMapping 数组的元素对应的每个 value 到 CacheKey 对象中。注意,这块逻辑,和 DefaultParameterHandler 获取 value 是一致的。
  • <4> 处,设置 Environment.id 到 CacheKey 对象中。

3.4 isCached

#isCached(MappedStatement ms, CacheKey key) 方法,判断一级缓存是否存在。代码如下:

// BaseExecutor.java

@Override
public boolean isCached(MappedStatement ms, CacheKey key) {
    return localCache.getObject(key) != null;
}

3.5 query

#query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) 方法,读操作。代码如下:

// BaseExecutor.java

@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    // <1> 获得 BoundSql 对象
    BoundSql boundSql = ms.getBoundSql(parameter);
    // <2> 创建 CacheKey 对象
    CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
    // <3> 查询
    return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
  • <1> 处,调用 MappedStatement#getBoundSql(Object parameterObject) 方法,获得 BoundSql 对象。
  • <2> 处,调用 #createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) 方法,创建 CacheKey 对象。
  • <3> 处,调用 #query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) 方法,读操作。通过这样的方式,两个 #query(...) 方法,实际是统一的。

#query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) 方法,读操作。代码如下:

// BaseExecutor.java

@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
    // <1> 已经关闭,则抛出 ExecutorException 异常
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    // <2> 清空本地缓存,如果 queryStack 为零,并且要求清空本地缓存。
    if (queryStack == 0 && ms.isFlushCacheRequired()) {
        clearLocalCache();
    }
    List<E> list;
    try {
        // <3> queryStack + 1
        queryStack++;
        // <4.1> 从一级缓存中,获取查询结果
        list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
        // <4.2> 获取到,则进行处理
        if (list != null) {
            handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
        // <4.3> 获得不到,则从数据库中查询
        } else {
            list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
        }
    } finally {
        // <5> queryStack - 1
        queryStack--;
    }
    if (queryStack == 0) {
        // <6.1> 执行延迟加载
        for (DeferredLoad deferredLoad : deferredLoads) {
            deferredLoad.load();
        }
        // issue #601
        // <6.2> 清空 deferredLoads
        deferredLoads.clear();
        // <7> 如果缓存级别是 LocalCacheScope.STATEMENT ,则进行清理
        if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
            // issue #482
            clearLocalCache();
        }
    }
    return list;
}
  • <1> 处,已经关闭,则抛出 ExecutorException 异常。

  • <2> 处,调用 #clearLocalCache() 方法,清空本地缓存,如果 queryStack 为零,并且要求清空本地缓存。例如:<select flushCache="true"> ... </a>

  • <3> 处,queryStack + 1 。

  • <4.1>
    

    处,从一级缓存

    localCache
    

    中,获取查询结果。

    • <4.2> 处,获取到,则进行处理。对于 #handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) 方法,是处理存储过程的情况,所以我们就忽略。
    • <4.3> 处,获得不到,则调用 #queryFromDatabase() 方法,从数据库中查询。详细解析,见 「3.5.1 queryFromDatabase」
  • <5> 处,queryStack - 1 。

  • <6.1>
    

    处,遍历 DeferredLoad 队列,逐个调用

    DeferredLoad#load()
    

    方法,执行延迟加载。详细解析,见

    《精尽 MyBatis 源码分析 —— SQL 执行(五)之延迟加载》

    • <6.2> 处,清空 DeferredLoad 队列。
  • <7> 处,如果缓存级别是 LocalCacheScope.STATEMENT ,则调用 #clearLocalCache() 方法,清空本地缓存。默认情况下,缓存级别是 LocalCacheScope.SESSION 。代码如下:

    // Configuration.java
    
    /**
     * {@link BaseExecutor} 本地缓存范围
     */
    protected LocalCacheScope localCacheScope = LocalCacheScope.SESSION;
    
    // LocalCacheScope.java
    
    public enum LocalCacheScope {
    
        /**
         * 会话级
         */
        SESSION,
        /**
         * SQL 语句级
         */
        STATEMENT
    
    }
    

3.5.1 queryFromDatabase

#queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) 方法,从数据库中读取操作。代码如下:

// BaseExecutor.java

private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    List<E> list;
    // <1> 在缓存中,添加占位对象。此处的占位符,和延迟加载有关,可见 `DeferredLoad#canLoad()` 方法
    localCache.putObject(key, EXECUTION_PLACEHOLDER);
    try {
        // <2> 执行读操作
        list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
    } finally {
        // <3> 从缓存中,移除占位对象
        localCache.removeObject(key);
    }
    // <4> 添加到缓存中
    localCache.putObject(key, list);
    // <5> 暂时忽略,存储过程相关
    if (ms.getStatementType() == StatementType.CALLABLE) {
        localOutputParameterCache.putObject(key, parameter);
    }
    return list;
}
  • <1> 处,在缓存中,添加占位对象。此处的占位符,和延迟加载有关,后续可见 DeferredLoad#canLoad() 方法。

    // BaseExecutor.java
    
    public enum ExecutionPlaceholder {
    
        /**
         * 正在执行中的占位符
         */
        EXECUTION_PLACEHOLDER
    
    }
    
  • <2> 处,调用 #doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) 执行读操作。这是个抽象方法,由子类实现。

  • <3> 处,从缓存中,移除占位对象。

  • <4> 处,添加结果到缓存中。

  • <5> 处,暂时忽略,存储过程相关。

3.5.2 doQuery

// BaseExecutor.java

protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
        throws SQLException;

3.6 queryCursor

#queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) 方法,执行查询,返回的结果为 Cursor 游标对象。代码如下:

// BaseExecutor.java

@Override
public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
    // <1> 获得 BoundSql 对象
    BoundSql boundSql = ms.getBoundSql(parameter);
    // 执行查询
    return doQueryCursor(ms, parameter, rowBounds, boundSql);
}
  • <1> 处,调用 MappedStatement#getBoundSql(Object parameterObject) 方法,获得 BoundSql 对象。
  • <2> 处,调用 #doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) 方法,执行读操作。这是个抽象方法,由子类实现。

3.6.1 doQueryCursor

// BaseExecutor.java

protected abstract <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql)
        throws SQLException;

3.7 update

#update(MappedStatement ms, Object parameter) 方法,执行写操作。代码如下:

// BaseExecutor.java

@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
    ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
    // <1> 已经关闭,则抛出 ExecutorException 异常
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    // <2> 清空本地缓存
    clearLocalCache();
    // <3> 执行写操作
    return doUpdate(ms, parameter);
}
  • <1> 处,已经关闭,则抛出 ExecutorException 异常。
  • <2> 处,调用 #clearLocalCache() 方法,清空本地缓存。因为,更新后,可能缓存会失效。但是,又没很好的办法,判断哪一些失效。所以,最稳妥的做法,就是全部清空。
  • <3> 处,调用 #doUpdate(MappedStatement ms, Object parameter) 方法,执行写操作。这是个抽象方法,由子类实现。

3.7.1 doUpdate

// BaseExecutor.java

protected abstract int doUpdate(MappedStatement ms, Object parameter)
        throws SQLException;

3.8 flushStatements

#flushStatements() 方法,刷入批处理语句。代码如下:

// BaseExecutor.java

@Override
public List<BatchResult> flushStatements() throws SQLException {
    return flushStatements(false);
}

public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
    // <1> 已经关闭,则抛出 ExecutorException 异常
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    // <2> 执行刷入批处理语句
    return doFlushStatements(isRollBack);
}
  • isRollBack 属性,目前看下来没什么逻辑。唯一看到在 BatchExecutor 中,如果 isRollBack = true ,则不执行刷入批处理语句。有点奇怪。
  • <1> 处,已经关闭,则抛出 ExecutorException 异常。
  • <2> 处,调用 #doFlushStatements(boolean isRollback) 方法,执行刷入批处理语句。这是个抽象方法,由子类实现。

3.8.1 doFlushStatements

// BaseExecutor.java

protected abstract List<BatchResult> doFlushStatements(boolean isRollback)
        throws SQLException;

至此,我们已经看到了 BaseExecutor 所定义的四个抽象方法

  • 「3.5.2 doQuery」
  • 「3.6.1 doQueryCursor」
  • 「3.7.1 doUpdate」
  • 「3.8.1 doFlushStatements」

3.9 getTransaction

#getTransaction() 方法,获得事务对象。代码如下:

// BaseExecutor.java

@Override
public Transaction getTransaction() {
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    return transaction;
}

3.9.1 commit

// BaseExecutor.java

@Override
public void commit(boolean required) throws SQLException {
    // 已经关闭,则抛出 ExecutorException 异常
    if (closed) {
        throw new ExecutorException("Cannot commit, transaction is already closed");
    }
    // 清空本地缓存
    clearLocalCache();
    // 刷入批处理语句
    flushStatements();
    // 是否要求提交事务。如果是,则提交事务。
    if (required) {
        transaction.commit();
    }
}

3.9.2 rollback

// BaseExecutor.java

public void rollback(boolean required) throws SQLException {
    if (!closed) {
        try {
            // 清空本地缓存
            clearLocalCache();
            // 刷入批处理语句
            flushStatements(true);
        } finally {
            if (required) {
                // 是否要求回滚事务。如果是,则回滚事务。
                transaction.rollback();
            }
        }
    }
}

3.10 close

#close() 方法,关闭执行器。代码如下:

// BaseExecutor.java

@Override
public void close(boolean forceRollback) {
    try {
        // 回滚事务
        try {
            rollback(forceRollback);
        } finally {
            // 关闭事务
            if (transaction != null) {
                transaction.close();
            }
        }
    } catch (SQLException e) {
        // Ignore.  There's nothing that can be done at this point.
        log.warn("Unexpected exception on closing transaction.  Cause: " + e);
    } finally {
        // 置空变量
        transaction = null;
        deferredLoads = null;
        localCache = null;
        localOutputParameterCache = null;
        closed = true;
    }
}

3.10.1 isClosed

// BaseExecutor.java

@Override
public boolean isClosed() {
    return closed;
}

3.11 deferLoad

详细解析,见 《精尽 MyBatis 源码分析 —— SQL 执行(五)之延迟加载》

3.12 setExecutorWrapper

#setExecutorWrapper(Executor wrapper) 方法,设置包装器。代码如下:

// BaseExecutor.java

@Override
public void setExecutorWrapper(Executor wrapper) {
    this.wrapper = wrapper;
}

3.13 其它方法

// BaseExecutor.java

// 获得 Connection 对象
protected Connection getConnection(Log statementLog) throws SQLException {
    // 获得 Connection 对象
    Connection connection = transaction.getConnection();
    // 如果 debug 日志级别,则创建 ConnectionLogger 对象,进行动态代理
    if (statementLog.isDebugEnabled()) {
        return ConnectionLogger.newInstance(connection, statementLog, queryStack);
    } else {
        return connection;
    }
}

// 设置事务超时时间
protected void applyTransactionTimeout(Statement statement) throws SQLException {
    StatementUtil.applyTransactionTimeout(statement, statement.getQueryTimeout(), transaction.getTimeout());
}

// 关闭 Statement 对象
protected void closeStatement(Statement statement) {
    if (statement != null) {
        try {
            if (!statement.isClosed()) {
                statement.close();
            }
        } catch (SQLException e) {
            // ignore
        }
    }
}

4. SimpleExecutor

org.apache.ibatis.executor.SimpleExecutor ,继承 BaseExecutor 抽象类,简单的 Executor 实现类。

  • 每次开始读或写操作,都创建对应的 Statement 对象。
  • 执行完成后,关闭该 Statement 对象。

4.1 构造方法

// SimpleExecutor.java

public SimpleExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
}

4.2 doQuery

老艿艿:从此处开始,我们会看到 StatementHandler 相关的调用,胖友可以结合 《精尽 MyBatis 源码分析 —— SQL 执行(二)之 StatementHandler》 一起看。

// SimpleExecutor.java

@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Statement stmt = null;
    try {
        Configuration configuration = ms.getConfiguration();
        // <1> 创建 StatementHandler 对象
        StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
        // <2> 初始化 StatementHandler 对象
        stmt = prepareStatement(handler, ms.getStatementLog());
        // <3> 执行 StatementHandler  ,进行读操作
        return handler.query(stmt, resultHandler);
    } finally {
        // <4> 关闭 StatementHandler 对象
        closeStatement(stmt);
    }
}
  • <1> 处,调用 Configuration#newStatementHandler(Executor executor, MappedStatement mappedStatement, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) 方法,创建 StatementHandler 对象。

  • <2> 处,调用 #prepareStatement(StatementHandler handler, Log statementLog) 方法,初始化 StatementHandler 对象。代码如下:

    // SimpleExecutor.java
    
    private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
        Statement stmt;
        // <2.1> 获得 Connection 对象
        Connection connection = getConnection(statementLog);
        // <2.2> 创建 Statement 或 PrepareStatement 对象
        stmt = handler.prepare(connection, transaction.getTimeout());
        // <2.3> 设置 SQL 上的参数,例如 PrepareStatement 对象上的占位符
        handler.parameterize(stmt);
        return stmt;
    }
    
    • <2.1> 处,调用 #getConnection(Log statementLog) 方法,获得 Connection 对象。
    • <2.2> 处,调用 StatementHandler#prepare(Connection connection, Integer transactionTimeout) 方法,创建 Statement 或 PrepareStatement 对象。
    • <2.3> 处,调用 StatementHandler#prepare(Statement statement) 方法,设置 SQL 上的参数,例如 PrepareStatement 对象上的占位符。
  • <3> 处,调用 StatementHandler#query(Statement statement, ResultHandler resultHandler) 方法,进行读操作

  • <4> 处,调用 #closeStatement(Statement stmt) 方法,关闭 StatementHandler 对象。

4.3 doQueryCursor

// SimpleExecutor.java

@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    // 创建 StatementHandler 对象
    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
    // 初始化 StatementHandler 对象
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    // 设置 Statement ,如果执行完成,则进行自动关闭
    stmt.closeOnCompletion();
    // 执行 StatementHandler  ,进行读操作
    return handler.queryCursor(stmt);
}
  • #doQuery(...) 方法的思路是一致的,胖友自己看下。

4.4 doUpdate

// SimpleExecutor.java

@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
    Statement stmt = null;
    try {
        Configuration configuration = ms.getConfiguration();
        // 创建 StatementHandler 对象
        StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
        // 初始化 StatementHandler 对象
        stmt = prepareStatement(handler, ms.getStatementLog());
        // <3> 执行 StatementHandler ,进行写操作
        return handler.update(stmt);
    } finally {
        // 关闭 StatementHandler 对象
        closeStatement(stmt);
    }
}
  • 相比 #doQuery(...) 方法,差异点在 <3> 处,换成了调用 StatementHandler#update(Statement statement) 方法,进行写操作

4.5 doFlushStatements

// SimpleExecutor.java

@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    return Collections.emptyList();
}
  • 不存在批量操作的情况,所以直接返回空数组。

5. ReuseExecutor

org.apache.ibatis.executor.ReuseExecutor ,继承 BaseExecutor 抽象类,可重用的 Executor 实现类。

  • 每次开始读或写操作,优先从缓存中获取对应的 Statement 对象。如果不存在,才进行创建。
  • 执行完成后,不关闭该 Statement 对象。
  • 其它的,和 SimpleExecutor 是一致的。

5.1 构造方法

// ReuseExecutor.java

/**
 * Statement 的缓存
 *
 * KEY SQL
 */
private final Map<String, Statement> statementMap = new HashMap<>();

public ReuseExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
}

5.2 doQuery

// ReuseExecutor.java

@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    // 创建 StatementHandler 对象
    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
    // <1> 初始化 StatementHandler 对象
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    // 执行 StatementHandler  ,进行读操作
    return handler.query(stmt, resultHandler);
}
  • 差异一,在于 <1> 处,调用 #prepareStatement(StatementHandler handler, Log statementLog) 方法,初始化 StatementHandler 对象。代码如下:

    // ReuseExecutor.java
    
    private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
        Statement stmt;
        BoundSql boundSql = handler.getBoundSql();
        String sql = boundSql.getSql();
        // 存在
        if (hasStatementFor(sql)) {
            // <1.1> 从缓存中获得 Statement 或 PrepareStatement 对象
            stmt = getStatement(sql);
            // <1.2> 设置事务超时时间
            applyTransactionTimeout(stmt);
        // 不存在
        } else {
            // <2.1> 获得 Connection 对象
            Connection connection = getConnection(statementLog);
            // <2.2> 创建 Statement 或 PrepareStatement 对象
            stmt = handler.prepare(connection, transaction.getTimeout());
            // <2.3> 添加到缓存中
            putStatement(sql, stmt);
        }
        // <2> 设置 SQL 上的参数,例如 PrepareStatement 对象上的占位符
        handler.parameterize(stmt);
        return stmt;
    }
    
    • 调用 #hasStatementFor(String sql) 方法,判断是否存在对应的 Statement 对象。代码如下:

      // ReuseExecutor.java
      
      private boolean hasStatementFor(String sql) {
          try {
              return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();
          } catch (SQLException e) {
              return false;
          }
      }
      
      • 并且,要求连接未关闭
    • 存在

      • <1.1> 处,调用 #getStatement(String s) 方法,获得 Statement 对象。代码如下:

        // ReuseExecutor.java
        
        private Statement getStatement(String s) {
            return statementMap.get(s);
        }
        
        • x
      • 【差异】<1.2> 处,调用 #applyTransactionTimeout(Statement stmt) 方法,设置事务超时时间。

    • 不存在

      • <2.1> 处,获得 Connection 对象。

      • <2.2> 处,调用 StatementHandler#prepare(Connection connection, Integer transactionTimeout) 方法,创建 Statement 或 PrepareStatement 对象。

      • 【差异】<2.3> 处,调用 #putStatement(String sql, Statement stmt) 方法,添加 Statement 对象到缓存中。代码如下:

        // ReuseExecutor.java
        
        private void putStatement(String sql, Statement stmt) {
            statementMap.put(sql, stmt);
        }
        
        • x
    • <2> 处,调用 StatementHandler#prepare(Statement statement) 方法,设置 SQL 上的参数,例如 PrepareStatement 对象上的占位符。

  • 差异二,在执行完数据库操作后,不会关闭 Statement 。

5.3 doQueryCursor

// ReuseExecutor.java

@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    // 创建 StatementHandler 对象
    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
    // 初始化 StatementHandler 对象
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    // 执行 StatementHandler  ,进行读操作
    return handler.queryCursor(stmt);
}

5.4 doUpdate

// ReuseExecutor.java

@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    // 创建 StatementHandler 对象
    StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
    // 初始化 StatementHandler 对象
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    // 执行 StatementHandler  ,进行写操作
    return handler.update(stmt);
}

5.5 doFlushStatements

// ReuseExecutor.java

@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    // 关闭缓存的 Statement 对象们
    for (Statement stmt : statementMap.values()) {
        closeStatement(stmt);
    }
    statementMap.clear();
    // 返回空集合
    return Collections.emptyList();
}
  • ReuseExecutor 考虑到重用性,但是 Statement 最终还是需要有地方关闭。答案就在 #doFlushStatements(boolean isRollback) 方法中。而 BaseExecutor 在关闭 #close() 方法中,最终也会调用该方法,从而完成关闭缓存的 Statement 对象们 。
  • 另外BaseExecutor 在提交或者回滚事务方法中,最终也会调用该方法,也能完成关闭缓存的 Statement 对象们。

6. BatchExecutor

org.apache.ibatis.executor.BatchExecutor ,继承 BaseExecutor 抽象类,批量执行的 Executor 实现类。

FROM 祖大俊 《Mybatis3.3.x技术内幕五鼠闹东京之执行器Executor设计原本》

执行update没有selectJDBC批处理不支持select将所有sql都添加到批处理中addBatch()等待统一执行executeBatch()它缓存了多个Statement对象每个Statement对象都是addBatch()完毕后等待逐一执行executeBatch()批处理的BatchExecutor相当于维护了多个桶每个桶里都装了很多属于自己的SQL就像苹果蓝里装了很多苹果番茄蓝里装了很多番茄最后再统一倒进仓库。可以是Statement或PrepareStatement对象

6.1 构造方法

// BatchExecutor.java

/**
 * Statement 数组
 */
private final List<Statement> statementList = new ArrayList<>();
/**
 * BatchResult 数组
 *
 * 每一个 BatchResult 元素,对应一个 {@link #statementList} 的 Statement 元素
 */
private final List<BatchResult> batchResultList = new ArrayList<>();
/**
 * 当前 SQL
 */
private String currentSql;
/**
 * 当前 MappedStatement 对象
 */
private MappedStatement currentStatement;

public BatchExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
}
  • currentSqlcurrentStatement 属性,当前 SQL 和 MappedStatement 对象。
  • batchResultListstatementList 属性,分别是 BatchResult 和 Statement 数组。并且,每一个 batchResultList 的 BatchResult 元素,对应一个 statementList 的 Statement 元素。
  • 具体怎么应用上述属性,我们见 #doUpdate(...)#doFlushStatements(...) 方法。

6.2 BatchResult

org.apache.ibatis.executor.BatchResult ,相同 SQL 聚合的结果。代码如下:

// BatchResult.java

/**
 * MappedStatement 对象
 */
private final MappedStatement mappedStatement;
/**
 * SQL
 */
private final String sql;
/**
 * 参数对象集合
 *
 * 每一个元素,对应一次操作的参数
 */
private final List<Object> parameterObjects;
/**
 * 更新数量集合
 *
 * 每一个元素,对应一次操作的更新数量
 */
private int[] updateCounts;

// ... 省略 setting / getting 相关方法

6.3 doUpdate

// BatchExecutor.java

@Override
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    final Configuration configuration = ms.getConfiguration();
    // <1> 创建 StatementHandler 对象
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    final BoundSql boundSql = handler.getBoundSql();
    final String sql = boundSql.getSql();
    final Statement stmt;
    // <2> 如果匹配最后一次 currentSql 和 currentStatement ,则聚合到 BatchResult 中
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
        // <2.1> 获得最后一次的 Statement 对象
        int last = statementList.size() - 1;
        stmt = statementList.get(last);
        // <2.2> 设置事务超时时间
        applyTransactionTimeout(stmt);
        // <2.3> 设置 SQL 上的参数,例如 PrepareStatement 对象上的占位符
        handler.parameterize(stmt);//fix Issues 322
        // <2.4> 获得最后一次的 BatchResult 对象,并添加参数到其中
        BatchResult batchResult = batchResultList.get(last);
        batchResult.addParameterObject(parameterObject);
    // <3> 如果不匹配最后一次 currentSql 和 currentStatement ,则新建 BatchResult 对象
    } else {
        // <3.1> 获得 Connection
        Connection connection = getConnection(ms.getStatementLog());
        // <3.2> 创建 Statement 或 PrepareStatement 对象
        stmt = handler.prepare(connection, transaction.getTimeout());
        // <3.3> 设置 SQL 上的参数,例如 PrepareStatement 对象上的占位符
        handler.parameterize(stmt);    //fix Issues 322
        // <3.4> 重新设置 currentSql 和 currentStatement
        currentSql = sql;
        currentStatement = ms;
        // <3.5> 添加 Statement 到 statementList 中
        statementList.add(stmt);
        // <3.6> 创建 BatchResult 对象,并添加到 batchResultList 中
        batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
    // handler.parameterize(stmt);
    // <4> 批处理
    handler.batch(stmt);
    return BATCH_UPDATE_RETURN_VALUE;
}
  • <1> 处,调用 Configuration#newStatementHandler(...) 方法,创建 StatementHandler 对象。

  • <2><3> 处,就是两种不同的情况,差异点在于传入的 sqlms 是否匹配当前的 currentSqlcurrentStatement 。如果是,则继续聚合到最后的 BatchResult 中,否则,创建新的 BatchResult 对象,进行“聚合”。

  • <2>
    

    块:

    • <2.1><2.2><2.3> 处,逻辑和 ReuseExecutor 相似,使用可重用的 Statement 对象,并进行初始化。
    • <2.4> 处,获得最后一次的 BatchResult 对象,并添加参数到其中。
  • <3>
    

    块:

    • <3.1><3.2><3.3> 处,逻辑和 SimpleExecutor 相似,创建新的 Statement 对象,并进行初始化。
    • <3.4> 处,重新设置 currentSqlcurrentStatement ,为当前传入的 sqlms
    • <3.5> 处,添加 Statement 到 statementList 中。
    • <3.6> 处,创建 BatchResult 对象,并添加到 batchResultList 中。
    • 那么,如果下一次执行这个方法,如果传递相同的 sqlms 进来,就会聚合到目前新创建的 BatchResult 对象中。
  • <4> 处,调用 StatementHandler#batch(Statement statement) 方法,批处理。代码如下:

    // 有多个实现类,先看两个
    
    // PreparedStatementHandler.java
    
    @Override
    public void batch(Statement statement) throws SQLException {
        PreparedStatement ps = (PreparedStatement) statement;
        ps.addBatch();
    }
    
    // SimpleStatementHandler.java
    
    @Override
    public void batch(Statement statement) throws SQLException {
        String sql = boundSql.getSql();
        statement.addBatch(sql);
    }
    
    • 这段代码,是不是非常熟悉。

6.4 doFlushStatements

// BatchExecutor.java

@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    try {
        // <1> 如果 isRollback 为 true ,返回空数组
        if (isRollback) {
            return Collections.emptyList();
        }
        // <2> 遍历 statementList 和 batchResultList 数组,逐个提交批处理
        List<BatchResult> results = new ArrayList<>();
        for (int i = 0, n = statementList.size(); i < n; i++) {
            // <2.1> 获得 Statement 和 BatchResult 对象
            Statement stmt = statementList.get(i);
            applyTransactionTimeout(stmt);
            BatchResult batchResult = batchResultList.get(i);
            try {
                // <2.2> 批量执行
                batchResult.setUpdateCounts(stmt.executeBatch());
                // <2.3> 处理主键生成
                MappedStatement ms = batchResult.getMappedStatement();
                List<Object> parameterObjects = batchResult.getParameterObjects();
                KeyGenerator keyGenerator = ms.getKeyGenerator();
                if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
                    Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
                    jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
                } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
                    for (Object parameter : parameterObjects) {
                        keyGenerator.processAfter(this, ms, stmt, parameter);
                    }
                }
                // Close statement to close cursor #1109
                // <2.4> 关闭 Statement 对象
                closeStatement(stmt);
            } catch (BatchUpdateException e) {
                // 如果发生异常,则抛出 BatchExecutorException 异常
                StringBuilder message = new StringBuilder();
                message.append(batchResult.getMappedStatement().getId())
                        .append(" (batch index #")
                        .append(i + 1)
                        .append(")")
                        .append(" failed.");
                if (i > 0) {
                    message.append(" ")
                            .append(i)
                            .append(" prior sub executor(s) completed successfully, but will be rolled back.");
                }
                throw new BatchExecutorException(message.toString(), e, results, batchResult);
            }
            // <2.5> 添加到结果集
            results.add(batchResult);
        }
        return results;
    } finally {
        // <3.1> 关闭 Statement 们
        for (Statement stmt : statementList) {
            closeStatement(stmt);
        }
        // <3.2> 置空 currentSql、statementList、batchResultList 属性
        currentSql = null;
        statementList.clear();
        batchResultList.clear();
    }
}
  • <1> 处,如果 isRollbacktrue ,返回空数组。

  • <2>
    

    处,遍历

    statementList
    

    batchResultList
    

    数组,逐个 Statement 提交批处理。

    • <2.1> 处,获得 Statement 和 BatchResult 对象。
    • 【重要】<2.2> 处,调用 Statement#executeBatch() 方法,批量执行。执行完成后,将结果赋值到 BatchResult.updateCounts 中。
    • <2.3> 处,处理主键生成。详细解析,见 《精尽 MyBatis 源码分析 —— SQL 执行(三)之 KeyGenerator》
    • <2.4> 处,调用 #closeStatement(stmt) 方法,关闭 Statement 对象。
    • <2.5> 处,添加到结果集。
  • <3.1> 处,关闭 Statement 们。

  • <3.2> 处,置空 currentSqlstatementListbatchResultList 属性。

6.5 doQuery

// BatchExecutor.java

@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
        throws SQLException {
    Statement stmt = null;
    try {
        // <1> 刷入批处理语句
        flushStatements();
        Configuration configuration = ms.getConfiguration();
        // 创建 StatementHandler 对象
        StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql);
        // 获得 Connection 对象
        Connection connection = getConnection(ms.getStatementLog());
        // 创建 Statement 或 PrepareStatement 对象
        stmt = handler.prepare(connection, transaction.getTimeout());
        // 设置 SQL 上的参数,例如 PrepareStatement 对象上的占位符
        handler.parameterize(stmt);
        // 执行 StatementHandler  ,进行读操作
        return handler.query(stmt, resultHandler);
    } finally {
        // 关闭 StatementHandler 对象
        closeStatement(stmt);
    }
}
  • 和 SimpleExecutor 的该方法,逻辑差不多。差别在于 <1> 处,发生查询之前,先调用 #flushStatements() 方法,刷入批处理语句。

6.6 doQueryCursor

// BatchExecutor.java

@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
    // <1> 刷入批处理语句
    flushStatements();
    Configuration configuration = ms.getConfiguration();
    // 创建 StatementHandler 对象
    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
    // 获得 Connection 对象
    Connection connection = getConnection(ms.getStatementLog());
    // 创建 Statement 或 PrepareStatement 对象
    Statement stmt = handler.prepare(connection, transaction.getTimeout());
    // 设置 Statement ,如果执行完成,则进行自动关闭
    stmt.closeOnCompletion();
    // 设置 SQL 上的参数,例如 PrepareStatement 对象上的占位符
    handler.parameterize(stmt);
    // 执行 StatementHandler  ,进行读操作
    return handler.queryCursor(stmt);
}
  • 和 SimpleExecutor 的该方法,逻辑差不多。差别在于 <1> 处,发生查询之前,先调用 #flushStatements() 方法,刷入批处理语句。

7. 二级缓存

在开始看具体源码之间,我们先来理解二级缓存的定义:

FROM 凯伦 《聊聊MyBatis缓存机制》

在上文中提到的一级缓存中,其最大的共享范围就是一个 SqlSession 内部,如果多个 SqlSession 之间需要共享缓存,则需要使用到二级缓存。开启二级缓存后,会使用 CachingExecutor 装饰 Executor ,进入一级缓存的查询流程前,先在 CachingExecutor 进行二级缓存的查询,具体的工作流程如下所示。

![大体流程](21-mybatis-SQL 执行(一)之 Executor.assets/04.png)大体流程

  • 那么,二级缓存,反应到具体代码里,是长什么样的呢?我们来打开 MappedStatement 类,代码如下:

    // MappedStatement.java
    
    /**
     * Cache 对象
     */
    private Cache cache;
    
    • 就是 cache 属性。在前面的文章中,我们已经看到,每个 XML Mapper 或 Mapper 接口的每个 SQL 操作声明,对应一个 MappedStatement 对象。通过 @CacheNamespace<cache /> 来声明,创建其所使用的 Cache 对象;也可以通过 @CacheNamespaceRef<cache-ref /> 来声明,使用指定 Namespace 的 Cache 对象。

    • 最终在 Configuration 类中的体现,代码如下:

      // Configuration.java
      
      /**
       * Cache 对象集合
       *
       * KEY命名空间 namespace
       */
      protected final Map<String, Cache> caches = new StrictMap<>("Caches collection");
      
      • 一个 KEY 为 Namespace 的 Map 对象。
    • 可能上述描述比较绕口,胖友好好理解下。

  • 通过在 mybatis-config.xml 中,配置如下开启二级缓存功能:

    <setting name="cacheEnabled" value="true"/>
    

7.1 CachingExecutor

org.apache.ibatis.executor.CachingExecutor ,实现 Executor 接口,支持二级缓存的 Executor 的实现类。

7.1.1 构造方法

// CachingExecutor.java

/**
 * 被委托的 Executor 对象
 */
private final Executor delegate;
/**
 * TransactionalCacheManager 对象
 */
private final TransactionalCacheManager tcm = new TransactionalCacheManager();

public CachingExecutor(Executor delegate) {
    // <1>
    this.delegate = delegate;
    // <2> 设置 delegate 被当前执行器所包装
    delegate.setExecutorWrapper(this);
}
  • tcm 属性TransactionalCacheManager 对象,支持事务的缓存管理器。因为二级缓存是支持跨 Session 进行共享,此处需要考虑事务,那么,必然需要做到事务提交时,才将当前事务中查询时产生的缓存,同步到二级缓存中。这个功能,就通过 TransactionalCacheManager 来实现。
  • <1> 处,设置 delegate 属性,为被委托的 Executor 对象。
  • <2> 处,调用 delegate 属性的 #setExecutorWrapper(Executor executor) 方法,设置 delegate当前执行器所包装。

7.1.2 直接调用委托方法

CachingExecutor 的如下方法,具体的实现代码,是直接调用委托执行器 delegate 的对应的方法。代码如下:

// CachingExecutor.java

public Transaction getTransaction() { return delegate.getTransaction(); }

@Override
public boolean isClosed() { return delegate.isClosed(); }

@Override
public List<BatchResult> flushStatements() throws SQLException { return delegate.flushStatements(); }

@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) { return delegate.createCacheKey(ms, parameterObject, rowBounds, boundSql); }

@Override
public boolean isCached(MappedStatement ms, CacheKey key) { return delegate.isCached(ms, key); }

@Override
public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) { delegate.deferLoad(ms, resultObject, property, key, targetType); }

@Override
public void clearLocalCache() { delegate.clearLocalCache(); }

7.1.3 query

// CachingExecutor.java

@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    // 获得 BoundSql 对象
    BoundSql boundSql = ms.getBoundSql(parameterObject);
    // 创建 CacheKey 对象
    CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
    // 查询
    return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
        throws SQLException {
    // <1> 
    Cache cache = ms.getCache();
    if (cache != null) { // <2> 
        // <2.1> 如果需要清空缓存,则进行清空
        flushCacheIfRequired(ms);
        if (ms.isUseCache() && resultHandler == null) { // <2.2>
            // 暂时忽略,存储过程相关
            ensureNoOutParams(ms, boundSql);
            @SuppressWarnings("unchecked")
            // <2.3> 从二级缓存中,获取结果
            List<E> list = (List<E>) tcm.getObject(cache, key);
            if (list == null) {
                // <2.4.1> 如果不存在,则从数据库中查询
                list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
                // <2.4.2> 缓存结果到二级缓存中
                tcm.putObject(cache, key, list); // issue #578 and #116
            }
            // <2.5> 如果存在,则直接返回结果
            return list;
        }
    }
    // <3> 不使用缓存,则从数据库中查询
    return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
  • <1> 处,调用 MappedStatement#getCache() 方法,获得 Cache 对象,即当前 MappedStatement 对象的二级缓存

  • <3> 处,如果没有 Cache 对象,说明该 MappedStatement 对象,未设置二级缓存,则调用 delegate 属性的 #query(...) 方法,直接从数据库中查询。

  • <2> 处,如果 Cache 对象,说明该 MappedStatement 对象,有设置二级缓存

    • <2.1> 处,调用 #flushCacheIfRequired(MappedStatement ms) 方法,如果需要清空缓存,则进行清空。代码如下:

      // CachingExecutor.java
      
      private void flushCacheIfRequired(MappedStatement ms) {
          Cache cache = ms.getCache();
          if (cache != null && ms.isFlushCacheRequired()) { // 是否需要清空缓存
              tcm.clear(cache);
          }
      }
      
      • 通过 @Options(flushCache = Options.FlushCachePolicy.TRUE)<select flushCache="true"> 方式,开启需要清空缓存。
      • 调用 TransactionalCache#clear() 方法,清空缓存。注意,此清空的仅仅,当前事务中查询数据产生的缓存。而真正的清空,在事务的提交时。这是为什么呢?还是因为二级缓存是跨 Session 共享缓存,在事务尚未结束时,不能对二级缓存做任何修改。😈 可能有点绕,胖友好好理解。
    • <2.2> 处,当 MappedStatement#isUseCache() 方法,返回 true 时,才使用二级缓存。默认开启。可通过 @Options(useCache = false)<select useCache="false"> 方法,关闭。

    • <2.3> 处,调用 TransactionalCacheManager#getObject(Cache cache, CacheKey key) 方法,从二级缓存中,获取结果。

    • 如果不存在缓存

      • <2.4.1> 处,调用 delegate 属性的 #query(...) 方法,再从数据库中查询。
      • <2.4.2> 处,调用 TransactionalCacheManager#put(Cache cache, CacheKey key, Object value) 方法,缓存结果到二级缓存中。😈 当然,正如上文所言,实际上,此处结果还没添加到二级缓存中。那具体是怎么样的呢?答案见 TransactionalCache 。
    • 如果存在缓存

      • <2.5> 处,如果存在,则直接返回结果。

7.1.4 queryCursor

// CachingExecutor.java

@Override
public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
    // 如果需要清空缓存,则进行清空
    flushCacheIfRequired(ms);
    // 执行 delegate 对应的方法
    return delegate.queryCursor(ms, parameter, rowBounds);
}
  • 无法开启二级缓存,所以只好调用 delegate 对应的方法。

7.1.5 update

// CachingExecutor.java

@Override
public int update(MappedStatement ms, Object parameterObject) throws SQLException {
    // 如果需要清空缓存,则进行清空
    flushCacheIfRequired(ms);
    // 执行 delegate 对应的方法
    return delegate.update(ms, parameterObject);
}

7.1.6 commit

// CachingExecutor.java

@Override
public void commit(boolean required) throws SQLException {
    // 执行 delegate 对应的方法
    delegate.commit(required);
    // 提交 TransactionalCacheManager
    tcm.commit();
}
  • delegatetcm 先后提交。

7.1.7 rollback

// CachingExecutor.java

@Override
public void rollback(boolean required) throws SQLException {
    try {
        // 执行 delegate 对应的方法
        delegate.rollback(required);
    } finally {
        if (required) {
            // 回滚 TransactionalCacheManager
            tcm.rollback();
        }
    }
}
  • delegatetcm 先后回滚。

7.1.8 close

// CachingExecutor.java

@Override
public void close(boolean forceRollback) {
    try {
        //issues #499, #524 and #573
        // 如果强制回滚,则回滚 TransactionalCacheManager
        if (forceRollback) {
            tcm.rollback();
        // 如果强制提交,则提交 TransactionalCacheManager
        } else {
            tcm.commit();
        }
    } finally {
        // 执行 delegate 对应的方法
        delegate.close(forceRollback);
    }
}
  • 根据 forceRollback 属性,进行 tcmdelegate 对应的操作。

7.2 TransactionalCacheManager

org.apache.ibatis.cache.TransactionalCacheManager TransactionalCache 管理器。

7.2.1 构造方法

// TransactionalCacheManager.java

/**
 * Cache 和 TransactionalCache 的映射
 */
private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();
  • 我们可以看到,transactionalCaches 是一个使用 Cache 作为 KEY TransactionalCache 作为 VALUE 的 Map 对象。

  • 为什么是一个 Map 对象呢?因为在一次的事务过程中,可能有多个不同的 MappedStatement 操作,而它们可能对应多个 Cache 对象。

  • TransactionalCache 是怎么创建的呢?答案在 #getTransactionalCache(Cache cache) 方法,代码如下:

    // TransactionalCacheManager.java
    
    private TransactionalCache getTransactionalCache(Cache cache) {
        return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
    }
    
    • 优先,从 transactionalCaches 获得 Cache 对象,对应的 TransactionalCache 对象。
    • 如果不存在,则创建一个 TransactionalCache 对象,并添加到 transactionalCaches 中。

7.2.2 putObject

#putObject(Cache cache, CacheKey key, Object value) 方法,添加 Cache + KV ,到缓存中。代码如下:

// TransactionalCacheManager.java

public void putObject(Cache cache, CacheKey key, Object value) {
    // 首先,获得 Cache 对应的 TransactionalCache 对象
    // 然后,添加 KV 到 TransactionalCache 对象中
    getTransactionalCache(cache).putObject(key, value);
}

7.2.3 getObject

#getObject(Cache cache, CacheKey key) 方法,获得缓存中,指定 Cache + K 的值。代码如下:

// TransactionalCacheManager.java

public Object getObject(Cache cache, CacheKey key) {
    // 首先,获得 Cache 对应的 TransactionalCache 对象
    // 然后从 TransactionalCache 对象中,获得 key 对应的值
    return getTransactionalCache(cache).getObject(key);
}

7.2.4 clear

#clear() 方法,清空缓存。代码如下:

// TransactionalCacheManager.java

public void clear(Cache cache) {
    getTransactionalCache(cache).clear();
}

7.2.5 commit

#commit() 方法,提交所有 TransactionalCache 。代码如下:

// TransactionalCacheManager.java

public void commit() {
    for (TransactionalCache txCache : transactionalCaches.values()) {
        txCache.commit();
    }
}
  • 通过调用该方法TransactionalCache 存储的当前事务的缓存,会同步到其对应的 Cache 对象。

7.2.6 rollback

#rollback() 方法,回滚所有 TransactionalCache 。代码如下:

// TransactionalCacheManager.java

public void rollback() {
    for (TransactionalCache txCache : transactionalCaches.values()) {
        txCache.rollback();
    }
}

7.3 TransactionalCache

org.apache.ibatis.cache.decorators.TransactionalCache ,实现 Cache 接口,支持事务的 Cache 实现类,主要用于二级缓存中。英语比较好的胖友,可以看看如下注释:

This class holds all cache entries that are to be added to the 2nd level cache during a Session. Entries are sent to the cache when commit is called or discarded if the Session is rolled back. Blocking cache support has been added. Therefore any get() that returns a cache miss will be followed by a put() so any lock associated with the key can be released.

7.3.1 构造方法

// TransactionalCache.java

/**
 * 委托的 Cache 对象。
 *
 * 实际上,就是二级缓存 Cache 对象。
 */
private final Cache delegate;
/**
 * 提交时,清空 {@link #delegate}
 *
 * 初始时,该值为 false
 * 清理后{@link #clear()} 时,该值为 true ,表示持续处于清空状态
 */
private boolean clearOnCommit;
/**
 * 待提交的 KV 映射
 */
private final Map<Object, Object> entriesToAddOnCommit;
/**
 * 查找不到的 KEY 集合
 */
private final Set<Object> entriesMissedInCache;

public TransactionalCache(Cache delegate) {
    this.delegate = delegate;
    this.clearOnCommit = false;
    this.entriesToAddOnCommit = new HashMap<>();
    this.entriesMissedInCache = new HashSet<>();
}
  • 胖友认真看下每个变量上的注释。
  • 在事务未提交时,entriesToAddOnCommit 属性,会暂存当前事务新产生的缓存 KV 对。
  • 在事务提交时,entriesToAddOnCommit 属性,会同步到二级缓存 delegate 中。

7.3.2 getObject

// TransactionalCache.java

@Override
public Object getObject(Object key) {
    // issue #116
    // <1> 从 delegate 中获取 key 对应的 value
    Object object = delegate.getObject(key);
    // <2> 如果不存在,则添加到 entriesMissedInCache 中
    if (object == null) {
        entriesMissedInCache.add(key);
    }
    // issue #146
    // <3> 如果 clearOnCommit 为 true ,表示处于持续清空状态,则返回 null
    if (clearOnCommit) {
        return null;
    // <4> 返回 value
    } else {
        return object;
    }
}
  • <1> 处,调用 delegate#getObject(Object key) 方法,从 delegate 中获取 key 对应的 value 。
  • <2> 处,如果不存在,则添加到 entriesMissedInCache 中。这是个神奇的逻辑???答案见 commit()#rollback() 方法。
  • <3> 处,如果 clearOnCommittrue ,表示处于持续清空状态,则返回 null 。因为在事务未结束前,我们执行的清空缓存操作不好同步到 delegate 中,所以只好通过 clearOnCommit 来标记处于清空状态。那么,如果处于该状态,自然就不能返回 delegate 中查找的结果。
  • <4> 处,返回 value 。

7.3.3 putObject

#putObject(Object key, Object object) 方法,暂存 KV 到 entriesToAddOnCommit 中。代码如下:

// TransactionalCache.java

@Override
public void putObject(Object key, Object object) {
    // 暂存 KV 到 entriesToAddOnCommit 中
    entriesToAddOnCommit.put(key, object);
}

7.3.4 removeObject

// TransactionalCache.java

public Object removeObject(Object key) {
    return null;
}
  • 不太明白为什么是这样的实现。不过目前也暂时不存在调用该方法的情况。暂时忽略。

7.3.5 clear

#clear() 方法,清空缓存。代码如下:

// TransactionalCache.java

@Override
public void clear() {
    // <1> 标记 clearOnCommit 为 true
    clearOnCommit = true;
    // <2> 清空 entriesToAddOnCommit
    entriesToAddOnCommit.clear();
}
  • <1> 处,标记 clearOnCommittrue
  • <2> 处,清空 entriesToAddOnCommit
  • 该方法,不会清空 delegate 的缓存。真正的清空,在事务提交时。

7.3.6 commit

#commit() 方法,提交事务。重头戏,代码如下:

// TransactionalCache.java

public void commit() {
    // <1> 如果 clearOnCommit 为 true ,则清空 delegate 缓存
    if (clearOnCommit) {
        delegate.clear();
    }
    // 将 entriesToAddOnCommit、entriesMissedInCache 刷入 delegate 中
    flushPendingEntries();
    // 重置
    reset();
}
  • <1> 处,如果 clearOnCommittrue ,则清空 delegate 缓存。

  • <2> 处,调用 #flushPendingEntries() 方法,将 entriesToAddOnCommitentriesMissedInCache 同步到 delegate 中。代码如下:

    // TransactionalCache.java
    
    private void flushPendingEntries() {
        // 将 entriesToAddOnCommit 刷入 delegate 中
        for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
            delegate.putObject(entry.getKey(), entry.getValue());
        }
        // 将 entriesMissedInCache 刷入 delegate 中
        for (Object entry : entriesMissedInCache) {
            if (!entriesToAddOnCommit.containsKey(entry)) {
                delegate.putObject(entry, null);
            }
        }
    }
    
    • 在看这段代码时,笔者一直疑惑 entriesMissedInCache 同步到 delegate 中,会不会存在问题。因为当前事务未查找到,不代表其他事务恰好实际能查到。这样,岂不是会将缓存错误的置空。后来一想,缓存即使真的被错误的置空,最多也就多从数据库中查询一次罢了。😈
  • <3> 处,调用 #reset() 方法,重置对象。代码如下:

    // TransactionalCache.java
    
    private void reset() {
        // 重置 clearOnCommit 为 false
        clearOnCommit = false;
        // 清空 entriesToAddOnCommit、entriesMissedInCache
        entriesToAddOnCommit.clear();
        entriesMissedInCache.clear();
    }
    
    • 因为,一个 Executor 可以提交多次事务,而 TransactionalCache 需要被重用,那么就需要重置回初始状态。

7.3.7 rollback

#rollback() 方法,回滚事务。代码如下:

// TransactionalCache.java

public void rollback() {
    // <1> 从 delegate 移除出 entriesMissedInCache
    unlockMissedEntries();
    // <2> 重置
    reset();
}
  • <1> 处,调用 #unlockMissedEntries() 方法,将 entriesMissedInCache 同步到 delegate 中。代码如下:

    // TransactionalCache.java
    
    private void unlockMissedEntries() {
        for (Object entry : entriesMissedInCache) {
            try {
                delegate.removeObject(entry);
            } catch (Exception e) {
                log.warn("Unexpected exception while notifiying a rollback to the cache adapter."
                        + "Consider upgrading your cache adapter to the latest version.  Cause: " + e);
            }
        }
    }
    
    • 即使事务回滚,也不妨碍在事务的执行过程中,发现 entriesMissedInCache 不存在对应的缓存。
  • <2> 处,调用 #reset() 方法,重置对象。

8. 创建 Executor 对象

在上面的文章中,我们已经看了各种 Executor 的实现代码。那么Executor 对象究竟在 MyBatis 中是如何被创建的呢Configuration 类中,提供 #newExecutor(Transaction transaction, ExecutorType executorType) 方法,代码如下:

// Configuration.java

/**
 * 创建 Executor 对象
 *
 * @param transaction 事务对象
 * @param executorType 执行器类型
 * @return Executor 对象
 */
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
    // <1> 获得执行器类型
    executorType = executorType == null ? defaultExecutorType : executorType; // 使用默认
    executorType = executorType == null ? ExecutorType.SIMPLE : executorType; // 使用 ExecutorType.SIMPLE
    // <2> 创建对应实现的 Executor 对象
    Executor executor;
    if (ExecutorType.BATCH == executorType) {
        executor = new BatchExecutor(this, transaction);
    } else if (ExecutorType.REUSE == executorType) {
        executor = new ReuseExecutor(this, transaction);
    } else {
        executor = new SimpleExecutor(this, transaction);
    }
    // <3> 如果开启缓存,创建 CachingExecutor 对象,进行包装
    if (cacheEnabled) {
        executor = new CachingExecutor(executor);
    }
    // <4> 应用插件
    executor = (Executor) interceptorChain.pluginAll(executor);
    return executor;
}
  • <1> 处,获得执行器类型。可以通过在 mybatis-config.xml 配置文件,如下:

    // value 有三种类型SIMPLE REUSE BATCH
    <setting name="defaultExecutorType" value="" />
    
  • org.apache.ibatis.session.ExecutorType ,执行器类型。代码如下:

    // ExecutorType.java
    
    public enum ExecutorType {
    
        /**
         * {@link org.apache.ibatis.executor.SimpleExecutor}
         */
        SIMPLE,
        /**
         * {@link org.apache.ibatis.executor.ReuseExecutor}
         */
        REUSE,
        /**
         * {@link org.apache.ibatis.executor.BatchExecutor}
         */
        BATCH
    
    }
    
  • <2> 处,创建对应实现的 Executor 对象。默认为 SimpleExecutor 对象。

  • <3> 处,如果开启缓存,创建 CachingExecutor 对象,进行包装。

  • <4> 处,应用插件。关于插件,我们在后续的文章中,详细解析。

9. ClosedExecutor

老艿艿:写到凌晨 1 点多,以为已经写完了,结果发现….

在 ResultLoaderMap 类中,有一个 ClosedExecutor 内部静态类,继承 BaseExecutor 抽象类,已经关闭的 Executor 实现类。代码如下:

// ResultLoaderMap.java

private static final class ClosedExecutor extends BaseExecutor {

    public ClosedExecutor() {
        super(null, null);
    }

    @Override
    public boolean isClosed() {
        return true;
    }

    @Override
    protected int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
        throw new UnsupportedOperationException("Not supported.");
    }

    @Override
    protected List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
        throw new UnsupportedOperationException("Not supported.");
    }

    @Override
    protected <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
        throw new UnsupportedOperationException("Not supported.");
    }

    @Override
    protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
        throw new UnsupportedOperationException("Not supported.");
    }
}
  • 仅仅在 ResultLoaderMap 中,作为一个“空”的 Executor 对象。没有什么特殊的意义和用途。

10. ErrorContext

org.apache.ibatis.executor.ErrorContext ,错误上下文,负责记录错误日志。代码比较简单,也蛮有意思,胖友可以自己研究研究。

另外,也可以参考下 《Mybatis 源码中获取到的错误日志灵感》

666. 彩蛋

困的来,写到大半夜 1 点多。回家偷懒了下,果然不行~~~

参考和推荐如下文章: