04、ShardingJDBC实战:JDBC实现

一 序

看源码的过程,有两个维度,主要是根据执行过程去看。好处是可以有个大的流程图。对于有遗漏的点,可以按照宝结构再扫一遍,查漏补缺。上一篇主要整理了分库分表及主要暴露的API。本边继续从jdbc规范整理,从jdbc的实现,柔性事务,读写分离,路由,结果合并。最后整理模块最多也是核心的SQL解析。感谢芋道源码。http://www.iocoder.cn/Sharding-JDBC/jdbc-implement-and-read-write-splitting/

对于jdbc的规范,shardignjdbc我理解就是封装了规范的类,然后将对象的方法进行重写,而ShardingDataSource、ShardingConnection、ShardingStatement、ShardingPreparedStament类可以看作一个代理对象,它们内部完成了根据逻辑sql语句,转化成真实sql语句,路由找到对应数据源,并根据数据源产生真实的Connection对象,然后执行真实sql语句,并将结果归并返回给应用层的功能。

二 包结构

看一下源码的总体结构:

*

unsupported:声明不支持的数据操作方法
adapter:适配类,实现和分库分表无关的方法

core:核心类,实现和分库分表相关的方法。下面是子包封装的DataSource、Connection、resultset、Statement。

以下图片来自芋道的博客:

datasource:

*

Connection

*

resultset

*

statement

*

看着层级比较多。实现层级关系:JDBC 接口 <=(继承)== unsupported抽象类 <=(继承)== unsupported抽象类 <=(继承)== core类。

三 核心Jdbc方法

3.1 unspported 包

unspported 包内的抽象类,声明不支持操作的数据对象,所有方法都是 throw new SQLFeatureNotSupportedException() 方式。

public abstract class AbstractUnsupportedOperationPreparedStatement extends AbstractStatementAdapter implements PreparedStatement {
    
    public AbstractUnsupportedOperationPreparedStatement() {
        super(PreparedStatement.class);
    }
    
    @Override
    public final ResultSetMetaData getMetaData() throws SQLException {
        throw new SQLFeatureNotSupportedException("getMetaData");
    }

3.2 WrapperAdapter

对Wrapper 接口实现如下两个方法:

@SuppressWarnings("unchecked")
    @Override
    public final <T> T unwrap(final Class<T> iface) throws SQLException {
        if (isWrapperFor(iface)) {
            return (T) this;
        }
        throw new SQLException(String.format("[%s] cannot be unwrapped as [%s]", getClass().getName(), iface.getName()));
    }
    
    @Override
    public final boolean isWrapperFor(final Class<?> iface) throws SQLException {
        return iface.isInstance(this);
    }

核心方法:提供子类 #recordMethodInvocation() 记录方法调用,#replayMethodsInvocation() 回放记录的方法调用:

 private final Collection<JdbcMethodInvocation> jdbcMethodInvocations = new ArrayList<>();
 /**
     * 记录方法调用.
     * 
     * @param targetClass 目标类
     * @param methodName 方法名称
     * @param argumentTypes 参数类型
     * @param arguments 参数
     */
    public final void recordMethodInvocation(final Class<?> targetClass, final String methodName, final Class<?>[] argumentTypes, final Object[] arguments) {
        try {
            jdbcMethodInvocations.add(new JdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments));
        } catch (final NoSuchMethodException ex) {
            throw new ShardingJdbcException(ex);
        }
    }
    
    /**
     * 回放记录的方法调用.
     * 
     * @param target 目标对象
     */
    public final void replayMethodsInvocation(final Object target) {
        for (JdbcMethodInvocation each : jdbcMethodInvocations) {
            each.invoke(target);
        }
    }

记录了调用的方法与传入方法的参数,实际是将JdbcMethodInvocation对象放入WrapperAdapter的成员变量。下面再看如何保存的。

用途举例: AbstractConnectionAdapter 的 #setAutoCommit(),先记录,后回放(在获取真正的connection时)。

   @Override
    public final void setAutoCommit(final boolean autoCommit) throws SQLException {
        this.autoCommit = autoCommit;
        recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});
        for (Connection each : cachedConnections.values()) {
            each.setAutoCommit(autoCommit);
        }
    }

回放代码:在ShardingConnection.getConnection()

为啥这么设计呢?简单理解就是(当它无数据库连接时,先记录;等获得到数据连接后,再回放)。后面细说。

3.3 JdbcMethodInvocation

上面说的记录到 JdbcMethodInvocation,看看jdbcMethodInvocations记录了所有调用的方法与参数。

public final class JdbcMethodInvocation {  
    private final Method method;//调用的方法  
    private final Object[] arguments;//调用方法时传入的参数  
      
    /** 
     *  调用方法. 
     *  
     * @param target 目标对象 
     */  
    public void invoke(final Object target) {  
        try {  
            method.invoke(target, arguments);  
        } catch (final IllegalAccessException | InvocationTargetException ex) {  
            throw new ShardingJdbcException("Invoke jdbc method exception", ex);  
        }  
    }  
}  

使用了反射。

3.4 核心流程举例

上面说的3.2,3.3是jdbc的核心类,怎么串联起来执行的呢?找个例子看下具体流程

看下jdbc的规范的:https://docs.oracle.com/javase/tutorial/jdbc/basics/index.html

ShardingPreparedStatement的executeUpdate方法:

下面图来自芋道博客:

*

@Override
    public int executeUpdate() throws SQLException {
        try {
            Collection<PreparedStatementUnit> preparedStatementUnits = route();
            return new PreparedStatementExecutor(
                    getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeUpdate();
        } finally {
            clearBatch();
        }
    }

总体上分两部分:1 route() 分库分表路由,获得预编译语句对象执行单元集合:Collection

2PreparedStatementExecutor.executeUpdate 就是上图的步骤11、12

    /**
     * Execute update.
     * 
     * @return effected records count
     * @throws SQLException SQL exception
     */
    public int executeUpdate() throws SQLException {
        List<Integer> results = executorEngine.executePreparedStatement(sqlType, preparedStatementUnits, parameters, new ExecuteCallback<Integer>() {
            
            @Override
            public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception {
                return ((PreparedStatement) baseStatementUnit.getStatement()).executeUpdate();
            }
        });
        return accumulate(results);
    }

再来看看调用细节。

route

private Collection<PreparedStatementUnit> route() throws SQLException {
        Collection<PreparedStatementUnit> result = new LinkedList<>();
        //设置路由
        routeResult = routingEngine.route(getParameters());
        //遍历SQL执行单元
        for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
            SQLType sqlType = routeResult.getSqlStatement().getType();
            Collection<PreparedStatement> preparedStatements;
             // 创建实际的 PreparedStatement
            if (SQLType.DDL == sqlType) {
                preparedStatements = generatePreparedStatementForDDL(each);
            } else {
                //上面的DDL少用。generatePreparedStatement是常用创建
                preparedStatements = Collections.singletonList(generatePreparedStatement(each));
            }
            routedStatements.addAll(preparedStatements);
             // 回放设置占位符参数到 PreparedStatement
            for (PreparedStatement preparedStatement : preparedStatements) {
                replaySetParameter(preparedStatement);
                result.add(new PreparedStatementUnit(each, preparedStatement));
            }
        }
        return result;
    }
 private PreparedStatement generatePreparedStatement(final SQLExecutionUnit sqlExecutionUnit) throws SQLException {
   Optional<GeneratedKey> generatedKey = getGeneratedKey();
   // 获得连接
   Connection connection = getShardingConnection().getConnection(sqlExecutionUnit.getDataSource(), getRouteResult().getSqlStatement().getType());
   // 声明返回主键,因为算法缺陷通常不用框架自带的,自己设置
   if (isReturnGeneratedKeys() || isReturnGeneratedKeys() && generatedKey.isPresent()) {
       return connection.prepareStatement(sqlExecutionUnit.getSql(), RETURN_GENERATED_KEYS);
   }
   return connection.prepareStatement(sqlExecutionUnit.getSql(), getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
}

调用 #generatePreparedStatement() 创建 PreparedStatement,后调用 #replaySetParameter() 回放设置占位符参数到 PreparedStatement。声明返回主键一般很少用,基于snowflake的算法需要自己改造。

再看获取连接部分代码,调用 ShardingConnection#getConnection() 方法获得该 PreparedStatement 对应的真实数据库连接( Connection ):

 public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException {
        if (getCachedConnections().containsKey(dataSourceName)) {
            return getCachedConnections().get(dataSourceName);
        }
        DataSource dataSource = shardingContext.getShardingRule().getDataSourceMap().get(dataSourceName);
        Preconditions.checkState(null != dataSource, "Missing the rule of %s in DataSourceRule", dataSourceName);
        String realDataSourceName;
        if (dataSource instanceof MasterSlaveDataSource) {
            NamedDataSource namedDataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType);
            realDataSourceName = namedDataSource.getName();
            if (getCachedConnections().containsKey(realDataSourceName)) {
                return getCachedConnections().get(realDataSourceName);
            }
            dataSource = namedDataSource.getDataSource();
        } else {
            realDataSourceName = dataSourceName;
        }
        Connection result = dataSource.getConnection();
        getCachedConnections().put(realDataSourceName, result);
        replayMethodsInvocation(result);
        return result;
    }

调用 #getCachedConnection() 尝试获得已缓存的数据库连接;如果缓存存在直接返回.其中: #getCachedConnections()来自ShardingConnection父类AbstractConnectionAdapter。属性为

@Getter
private final Map<String, Connection> cachedConnections = new HashMap<>();

而connectionMap 为保存真实Connection的Map,其key为配置文件中真实dataSource Bean的id。connectionMap 的初始值为空。在调用ShardingConnection的getConnection方法得到真实的Connection对象后,才会将其实放connectionMap 中。

从ShardingRule 配置的 DataSourceRule 获取真实的数据源( DataSource )
MasterSlaveDataSource 判断是否主从,实现主从数据源封装
调用 #replayMethodsInvocation() 回放记录的 Connection 方法,这里就有上面的说的回放代码。

我们再看看获取数据源部分:

ShardingJDBC框架通过配置文件指定ShardingDataSource与外部应用程序交互,达到访问真实数据源的目的。其有三个成员变量

public class ShardingDataSource extends AbstractDataSourceAdapter implements AutoCloseable {
    
    private ShardingProperties shardingProperties;
    
    private ExecutorEngine executorEngine;
    
    private ShardingContext shardingContext;

shardingProperties为一些属性,executorEngine为一个多线程的执行引擎其内部采用Guava的ListeningExecutorService实现。shardingContext为上下文信息,记录了分库分表规则配置、执行引擎、sql路由引擎这些信息。以便ShardingConnection 使用。
利用ShardingJDBC框架后通过DataSource得到的Connection实际上是包装后的ShardingConnection。ShardingDataSource对DataSource的所有方法进行了重写,其中getConnection如下:

  @Override
    public ShardingConnection getConnection() throws SQLException {
        return new ShardingConnection(shardingContext);
    }

得到得到的Connection实际上是包装后的ShardingConnection,而Sharding上下文对象在创建ShardingConnection时被传入。

public final class ShardingConnection extends AbstractConnectionAdapter {
    
    @Getter
    private final ShardingContext shardingContext;

再回到上面的示例:当我们调用con.setAutoCommit方法时,实际是调用ShardingConnection重写后的setAutoCommit方法。该方法在其父类AbstractConnectionAdapter中完成重写。

 public final void setAutoCommit(final boolean autoCommit) throws SQLException {
        this.autoCommit = autoCommit;
        //记录调用过的方法与对应参数,当在产生真正的Connection对象时,  
       //再利用反射的方式让真正的Connection对象调用该方法  
        recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});
        for (Connection each : cachedConnections.values()) {
            each.setAutoCommit(autoCommit);
        }
    }

除了setAutoCommit方法外以下的方式也是通过这JdbcMethodInvocation实现的。
AbstractConnectionAdapter类:setReadOnly,setTransactionIsolation

总结:

通过ShardingDataSource类得到的ShardingConnection不是真的JDBC Connection对象,其父类AbstractConnectionAdapter内部有一个Map<String, Connection> cachedConnections 。key为数据源名,value为真实的Connection。当调用Connection类的一些方法时,实际是调用了ShardingConnection 重写后的方法,而此时connectionMap的还是空,达不到调用真实Connection对象方法的功能,为此在ShardingConnection 重写后的方法方法中将调用的方法与方法的参数记录到JdbcMethodInvocation对象中。只有当ShardingPreparedStatement或者ShardingStatement调用对应数据操作函数时(executeQuery,execute,executeUpdate)才会调用ShardingConnection的getContetion方法产生真实的Connection对象,同时把这些对象放入connectionMap。为此在调用ShardingConnection 的setAutoCommint方法、setReadOnly方法、setTransactionIsolation方法时,要记录一下所有调用过的方法的名称与参数。当通过ShardingPreparedStatement或者ShardingStatement产生一个实际的Connection时,再通过反射的方式让真实的Connection对象调用JdbcMethodInvocation记录的方法与方法的参数。

参考:

https://blog.csdn.net/jackyechina/article/details/53010667

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: