05、ShardingJDBC实战:SQL路由实现

一 序

上一篇整理了jdbc的过程 ,update大概分两步,route跟执行。查询还有结果集归并。

上面本篇继续从路由开始整理。大概分为:分库分表整体,route包源码分析。

本文主要基于shardingjdbc 2.0.3版本整理。感谢芋道源码:http://www.iocoder.cn/Sharding-JDBC/sql-route-2/

二 分库分表:

这一部分算是第二篇API的补充吧。假设你阅读本文的时候已经熟悉相关概念或者阅读过官网的手册。

2.1 tablerule: rule包下面

表规则配置对象

public TableRule(final String logicTable, final List<String> actualDataNodes, final Map<String, DataSource> dataSourceMap,
                     final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy, 
                     final String generateKeyColumn, final KeyGenerator keyGenerator, final String logicIndex) {
        this.logicTable = logicTable.toLowerCase();
        this.actualDataNodes = null == actualDataNodes || actualDataNodes.isEmpty() ? generateDataNodes(logicTable, dataSourceMap) : generateDataNodes(actualDataNodes, dataSourceMap);
        this.databaseShardingStrategy = databaseShardingStrategy;
        this.tableShardingStrategy = tableShardingStrategy;
        this.generateKeyColumn = generateKeyColumn;
        this.keyGenerator = keyGenerator;
        this.logicIndex = null == logicIndex ? null : logicIndex.toLowerCase();
    }

有TableRuleConfiguration.build生成tablerule

2.2 DataNode 分库分表数据单元

 private List<DataNode> generateDataNodes(final String logicTable, final Map<String, DataSource> dataSourceMap) {
        List<DataNode> result = new LinkedList<>();
        for (String each : dataSourceMap.keySet()) {
            result.add(new DataNode(each, logicTable));//逻辑表
        }
        return result;
    }
    //物理表
    private List<DataNode> generateDataNodes(final List<String> actualDataNodes, final Map<String, DataSource> dataSourceMap) {
        List<DataNode> result = new LinkedList<>();
        for (String each : actualDataNodes) {
            DataNode dataNode = new DataNode(each);
            Preconditions.checkArgument(dataSourceMap.containsKey(dataNode.getDataSourceName()), String.format("Cannot find data source in sharding rule, invalid actual data node is: '%s'", each));
            result.add(dataNode);
        }
        return result;
    }
    public DataNode(final String dataNode) {
        Preconditions.checkArgument(DataNode.isValidDataNode(dataNode), String.format("Invalid format for actual data nodes: '%s'", dataNode));
        List<String> segments = Splitter.on(DELIMITER).splitToList(dataNode);
        dataSourceName = segments.get(0);
        tableName = segments.get(1);
    }

datanode 配置为 ${dataSourceName}.${tableName} 时。获取根据“.”截取数据源名称,表名。

2.3 shardingrule

分库分表规则配置对象,包含如下属性:

public final class ShardingRule {
    
    private final Map<String, DataSource> dataSourceMap;
    
    private final String defaultDataSourceName;
    
    private final Collection<TableRule> tableRules;
    
    private final Collection<BindingTableRule> bindingTableRules = new LinkedList<>();
    
    private final ShardingStrategy defaultDatabaseShardingStrategy;
    
    private final ShardingStrategy defaultTableShardingStrategy;
    
    private final KeyGenerator defaultKeyGenerator;

dataSourceMap 数据源
tableRules 分表规则
BindingTableRule Binding表规则配置对象.
defaultDatabaseShardingStrategy 默认分库策略
defaultTableShardingStrategy 默认分表策略

defaultKeyGenerator 主键生成

   this.defaultDatabaseShardingStrategy = null == defaultDatabaseShardingStrategy ? new NoneShardingStrategy() : defaultDatabaseShardingStrategy;
        this.defaultTableShardingStrategy = null == defaultTableShardingStrategy ? new NoneShardingStrategy() : defaultTableShardingStrategy;

构造方法里面,没有分库分表策略,则 使用 NoneShardingStrategy

三 SQL路由

*

整体源码包结构

3.1 SQLRouteResult

经过SQL解析、SQL路由后,产生SQL路由结果,即 SQLRouteResult。根据路由结果,生成SQL,执行SQL。

*

public final class SQLRouteResult {
    
    private final SQLStatement sqlStatement;
    
    private final Set<SQLExecutionUnit> executionUnits = new LinkedHashSet<>();
    
    private final List<Number> generatedKeys = new LinkedList<>();
}

sqlStatement :SQL语句对象,经过SQL解析的结果对象。
executionUnits :SQL最小执行单元集合。SQL执行时,执行每个单元。

generatedKeys :插入SQL语句生成的主键编号集合

3.2 分库分表策略

*

举个例子:StandardShardingStrategy

 @SuppressWarnings("unchecked")
    private Collection<String> doSharding(final Collection<String> availableTargetNames, final ListShardingValue<?> shardingValue) {
        Collection<String> result = new LinkedList<>();
        for (PreciseShardingValue<?> each : transferToPreciseShardingValues(shardingValue)) {
            result.add(preciseShardingAlgorithm.doSharding(availableTargetNames, each));
        }
        return result;
    }

可见具体的实现有算法的dosharding来实现。shardingjdbc支持的具体的算法如下:

*

3.3 路由流程

本图来自与芋道源码整理

*

    public SQLRouteResult route(final List<Object> parameters) {
        if (null == sqlStatement) {
            sqlStatement = sqlRouter.parse(logicSQL, parameters.size());
        }
        return sqlRouter.route(logicSQL, parameters, sqlStatement);
    }

也是PreparedStatementRoutingEngine的核心方法route。

SQLRouter,SQL 路由器接口,共有两种实现:
DatabaseHintSQLRouter:通过提示且仅路由至数据库的SQL路由器
ParsingSQLRouter:需要解析的SQL路由器

**

它们实现 #parse()进行SQL解析,#route()进行SQL路由。

RoutingEngine,路由引擎接口,共有6种实现:

*

后面在细说下对应的内部实现。

由结果有两种:

RoutingResult:简单路由结果 CartesianRoutingResult:笛卡尔积路由结果

*

SQLRouteResult 和 RoutingResult 有什么区别?

SQLRouteResult:整个SQL路由返回的路由结果

RoutingResult:RoutingEngine返回路由结果

***********************

DatabaseHintSQLRouter 是基于hint的,这里不展开,以后单独整理hint的时候梳理下。下面说一下ParsingSQLRouter

4 ParsingSQLRouter

ParsingSQLRouter,需要解析的SQL路由器。

ParsingSQLRouter 使用 SQLParsingEngine 解析SQL。

@Override
    public SQLStatement parse(final String logicSQL, final int parametersSize) {
        SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule);
        SQLStatement result = parsingEngine.parse();
        if (result instanceof InsertStatement) {
            ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize);
        }
        return result;
    }
 private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {
        Collection<String> tableNames = sqlStatement.getTables().getTableNames();
        RoutingEngine routingEngine;
        if (sqlStatement instanceof DDLStatement) {
            routingEngine = new DDLRoutingEngine(shardingRule, parameters, (DDLStatement) sqlStatement); 
        } else if (tableNames.isEmpty()) {
            routingEngine = new DatabaseAllRoutingEngine(shardingRule.getDataSourceMap());
        } else if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames) || shardingRule.isAllInDefaultDataSource(tableNames)) {
            routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);
        } else {
            // TODO config for cartesian set
            routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
        }
        return routingEngine.route();
    }

ParsingSQLRouter 在路由时,会根据表情况使用不同的路由引擎:

DDLRoutingEngine DDL 这种比较少 DatabaseAllRoutingEngine 表名为空的情况

SimpleRoutingEngine 如果sql中只有一个表名,或者多个表名之间是绑定表关系,或者所有表都在默认数据源指定的数据库中(即不参与分库分表的表),那么用SimpleRoutingEngine作为路由判断引擎;(多表互为BindingTable关系时,每张表的路由结果是相同的,所以只要计算第一张表的分片即可。isAllBindingTables可以看看源码怎么判断的,比较长)

其他为ComplexRoutingEngine。

下面说一下常见的两种引擎类型:

4.1 SimpleRoutingEngine

*

@Override
public RoutingResult route() {
    // 根据逻辑表得到tableRule
    TableRule tableRule = shardingRule.getTableRule(logicTableName);
    // 根据规则先路由数据源:即根据shardingkey取模路由
    Collection<String> routedDataSources = routeDataSources(tableRule);
    // routedMap保存路由到的目标数据源和表的结果:key为数据源,value为该数据源下路由到的目标表集合
    Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size());
    // 遍历路由到的目标数据源
    for (String each : routedDataSources) {
        // 再根据规则路由表:即根据shardingkey取模路由
        routedMap.put(each, routeTables(tableRule, each));
    }
    // 将得到的路由数据源和表信息封装到RoutingResult中,RoutingResult中有个TableUnits类型属性,TableUnits类中有个List<TableUnit> tableUnits属性,TableUnit包含三个属性:dataSourceName--数据源名称,logicTableName--逻辑表名称,actualTableName--实际表名称
    return generateRoutingResult(tableRule, routedMap);
}

最终需要执行的表数量为路由到的数据源个数*路由到的实际表个数;可以自己跟一下调用过程。

4.2 ComplexRoutingEngine

ComplexRoutingEngine,混合多库表路由引擎。

public final class ComplexRoutingEngine implements RoutingEngine {
    
    private final ShardingRule shardingRule; //分库分表规则
    
    private final List<Object> parameters; //SQL请求参数,
    
    private final Collection<String> logicTables; //逻辑表集合
    
    private final SQLStatement sqlStatement;//SQL解析结果
    
    @Override
    public RoutingResult route() {
        Collection<RoutingResult> result = new ArrayList<>(logicTables.size());
        Collection<String> bindingTableNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
        for (String each : logicTables) {// 遍历逻辑表集合
            Optional<TableRule> tableRule = shardingRule.tryFindTableRule(each);
            if (tableRule.isPresent()) {
                //如果遍历的表配置了分库分表规则,则不再处理
                if (!bindingTableNames.contains(each)) {
                    result.add(new SimpleRoutingEngine(shardingRule, parameters, tableRule.get().getLogicTable(), sqlStatement).route());
                }
                //根据当前逻辑表,查找其对应的所有绑定表,
                Optional<BindingTableRule> bindingTableRule = shardingRule.findBindingTableRule(each);
                if (bindingTableRule.isPresent()) {
                    bindingTableNames.addAll(Lists.transform(bindingTableRule.get().getTableRules(), new Function<TableRule, String>() {
                        
                        @Override
                        public String apply(final TableRule input) {
                            return input.getLogicTable();
                        }
                    }));
                }
            }
        }
        log.trace("mixed tables sharding result: {}", result);
        //复杂路由,路由结果却为空,抛异常
        if (result.isEmpty()) {
            throw new ShardingJdbcException("Cannot find table rule and default data source with logic tables: '%s'", logicTables);
        }
        if (1 == result.size()) {//
            return result.iterator().next();
        }
        //交给 CartesianRoutingEngine 形成笛卡尔积结果
        return new CartesianRoutingEngine(result).route();
    }
}

4.3 CartesianRoutingEngine

如上分析,求得简单路由结果集后,求笛卡尔积就是复杂路由的最终路由结果,笛卡尔积路由引擎CartesianRoutingEngine的核心源码如下:

public final class CartesianRoutingEngine implements RoutingEngine {
    
    private final Collection<RoutingResult> routingResults;

		@Override
		public CartesianRoutingResult route() {
		   CartesianRoutingResult result = new CartesianRoutingResult();
		   for (Entry<String, Set<String>> entry : getDataSourceLogicTablesMap().entrySet()) { // Entry<数据源(库), Set<逻辑表>> entry
		       // 获得当前数据源(库)的 路由表单元分组
		       List<Set<String>> actualTableGroups = getActualTableGroups(entry.getKey(), entry.getValue()); // List<Set<真实表>>
		       List<Set<TableUnit>> tableUnitGroups = toTableUnitGroups(entry.getKey(), actualTableGroups);//tableUnit的属性有:数据源名称,逻辑表名,实际表名(这三个属性才能确定最终访问的表)
		       // 笛卡尔积,并合并结果
		       result.merge(entry.getKey(), getCartesianTableReferences(Sets.cartesianProduct(tableUnitGroups)));
		   }
		   log.trace("cartesian tables sharding result: {}", result);
		   return result;
		}

第一步,获得同库对应的逻辑表集合,即 Entry<数据源(库), Set<逻辑表>> entry。
第二步,遍历数据源(库),获得当前数据源(库)的路由表单元分组。

第三步,对路由表单元分组进行笛卡尔积,并合并到路由结果。

	// 第一步
// CartesianRoutingEngine.java

private Map<String, Set<String>> getDataSourceLogicTablesMap() {
   Collection<String> intersectionDataSources = getIntersectionDataSources();
   Map<String, Set<String>> result = new HashMap<>(routingResults.size());
   // 获得同库对应的逻辑表集合
   for (RoutingResult each : routingResults) {
       for (Entry<String, Set<String>> entry : each.getTableUnits().getDataSourceLogicTablesMap(intersectionDataSources).entrySet()) { // 过滤掉不在数据源(库)交集的逻辑表
           if (result.containsKey(entry.getKey())) {
               result.get(entry.getKey()).addAll(entry.getValue());
           } else {
               result.put(entry.getKey(), entry.getValue());
           }
       }
   }
   //这里得到的结果是:数据源+逻辑表
   return result;
}
private Collection<String> getIntersectionDataSources() {
        Collection<String> result = new HashSet<>();
        for (RoutingResult each : routingResults) {
            if (result.isEmpty()) {
                result.addAll(each.getTableUnits().getDataSourceNames());
            }
            result.retainAll(each.getTableUnits().getDataSourceNames()); //交集
        }
        return result;
    }

这一步很关键的一点是同数据源的取交集。

第二步

    private List<Set<String>> getActualTableGroups(final String dataSource, final Set<String> logicTables) {
        List<Set<String>> result = new ArrayList<>(logicTables.size());
        for (RoutingResult each : routingResults) {
            result.addAll(each.getTableUnits().getActualTableNameGroups(dataSource, logicTables));
        }
        return result;
    }
private List<Set<TableUnit>> toTableUnitGroups(final String dataSource, final List<Set<String>> actualTableGroups) {
        List<Set<TableUnit>> result = new ArrayList<>(actualTableGroups.size());
        for (Set<String> each : actualTableGroups) {
            result.add(new HashSet<>(Lists.transform(new ArrayList<>(each), new Function<String, TableUnit>() {
    
                @Override
                public TableUnit apply(final String input) {
                    return findTableUnit(dataSource, input);
                }
            })));
        }
        return result;
    }
private List<CartesianTableReference> getCartesianTableReferences(final Set<List<TableUnit>> cartesianTableUnitGroups) {
        List<CartesianTableReference> result = new ArrayList<>(cartesianTableUnitGroups.size());
        for (List<TableUnit> each : cartesianTableUnitGroups) {
            result.add(new CartesianTableReference(each));
        }
        return result;
    }
public final class CartesianRoutingResult extends RoutingResult {
    
    @Getter
    private final List<CartesianDataSource> routingDataSources = new ArrayList<>();
    
    void merge(final String dataSource, final Collection<CartesianTableReference> routingTableReferences) {
        for (CartesianTableReference each : routingTableReferences) {
            merge(dataSource, each);
        }
    }
    
    private void merge(final String dataSource, final CartesianTableReference routingTableReference) {
        for (CartesianDataSource each : routingDataSources) {
            if (each.getDataSource().equalsIgnoreCase(dataSource)) {
                each.getRoutingTableReferences().add(routingTableReference);
                return;
            }
        }
        routingDataSources.add(new CartesianDataSource(dataSource, routingTableReference));
    }

merge() 合并笛卡尔积路由结果。CartesianRoutingResult 包含多个 CartesianDataSource,因此需要将 CartesianTableReference 合并(添加)到对应的 CartesianDataSource。当然,目前在实现时已经是按照数据源(库)生成对应的 CartesianTableReference。

最后:回来看下ParsingSQLRouter 主#route()

 @Override
    public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
        SQLRouteResult result = new SQLRouteResult(sqlStatement);
        //处理 插入SQL 主键字段
        if (sqlStatement instanceof InsertStatement && null != ((InsertStatement) sqlStatement).getGeneratedKey()) {
            processGeneratedKey(parameters, (InsertStatement) sqlStatement, result);
        }
        //路由
        RoutingResult routingResult = route(parameters, sqlStatement);
        //SQL重写引擎
        SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement);
        boolean isSingleRouting = routingResult.isSingleRouting();
        // 处理分页
        if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) {
            processLimit(parameters, (SelectStatement) sqlStatement, isSingleRouting);
        }
        // SQL 重写
        SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
        // 生成 ExecutionUnit
        if (routingResult instanceof CartesianRoutingResult) {
            for (CartesianDataSource cartesianDataSource : ((CartesianRoutingResult) routingResult).getRoutingDataSources()) {
                for (CartesianTableReference cartesianTableReference : cartesianDataSource.getRoutingTableReferences()) {
                    result.getExecutionUnits().add(new SQLExecutionUnit(cartesianDataSource.getDataSource(), rewriteEngine.generateSQL(cartesianTableReference, sqlBuilder)));
                }
            }
        } else {
            for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
                result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder)));
            }
        }
        //打印sql
        if (showSQL) {
            SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters);
        }
        return result;
    }

这块代码看起来比较长。自己debug跟一下对比着表就清晰多了。

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: