09、ShardingJDBC实战:groupby结果合并

4 GroupByStreamResultSetMerger

GroupByStreamResultSetMerger,基于 Stream 方式分组归并结果集实现。 它继承自 OrderByStreamResultSetMerger,在排序的逻辑上,实现分组功能。实现原理也较为简单:

public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMerger {
    
    private final Map<String, Integer> labelAndIndexMap;    
    private final SelectStatement selectStatement;    
    private final List<Object> currentRow;
    
    private List<?> currentGroupByValues;
    public GroupByStreamResultSetMerger(
            final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
        // GroupByStreamResultSetMerger的父类是OrderByStreamResultSetMerger,所以调用super()就是调用OrderByStreamResultSetMerger的构造方法
        super(resultSets, selectStatement.getOrderByItems());
        // 标签(列名)和位置索引的map关系,例如{order_id:1, status:3, user_id:2}        
        this.labelAndIndexMap = labelAndIndexMap;
        // 执行的SQL语句
        this.selectStatement = selectStatement;
        currentRow = new ArrayList<>(labelAndIndexMap.size());
        // 如果优先级队列不为空,表示where条件中有group by,将队列中第一个元素的group值赋值给currentGroupByValues,(默认升序排列)
        currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
    }

currentRow 为当前结果记录,使用 #getValue()、#getCalendarValue() 方法获得当前结果记录的查询列值。
currentGroupByValues 为下一条结果记录 GROUP BY 条件,通过 GroupByValue 生成:

 public final class GroupByValue {
    
    private final List<?> groupValues;
    
    public GroupByValue(final ResultSet resultSet, final List<OrderItem> groupByItems) throws SQLException {
        groupValues = getGroupByValues(resultSet, groupByItems);
    }
    
    private List<?> getGroupByValues(final ResultSet resultSet, final List<OrderItem> groupByItems) throws SQLException {
        List<Object> result = new ArrayList<>(groupByItems.size());
        for (OrderItem each : groupByItems) {
            result.add(resultSet.getObject(each.getIndex()));// 从结果集获得每个分组条件的值
        }
        }
        return result;
    }

4.1 核心方法next()

// GroupByStreamResultSetMerger.java
@Override
public boolean next() throws SQLException {
   // 清除当前结果记录
   currentRow.clear();
   // 如果优先级队列为空,表示没有任何结果,那么返回false
   if (getOrderByValuesQueue().isEmpty()) {
       return false;
   }
   //
   if (isFirstNext()) {
       super.next();
   }
   //集合的核心逻辑:顺序合并下面相同分组条件的记录
   if (aggregateCurrentGroupByRowAndNext()) {
       // 生成下一条结果记录 GROUP BY 条件
       currentGroupByValues = new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
   }
   return true;
}
private boolean aggregateCurrentGroupByRowAndNext() throws SQLException {
        boolean result = false;
        //生成计算单元 selectStatement.getAggregationSelectItems()先得到select所有举行类型的项,
        Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {
            
            @Override
            public AggregationUnit apply(final AggregationSelectItem input) {
                return AggregationUnitFactory.create(input.getType());
            }
        });
        //接下来准备聚合:循环顺序合并下面相同分组条件的记录
        while (currentGroupByValues.equals(new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())) {
            // 归并聚合值
            aggregate(aggregationUnitMap);
            //缓存当前记录到结果记录
            cacheCurrentRow();
            //获取下一条记录,实际调用OrderByStreamResultSetMerger中的next()方法,currentResultSet会指向下一个元素;
            result = super.next();
            //如果还有值,那么继续遍历
            if (!result) {
                break;
            }
            //设置当前记录的聚合字段结果
        setAggregationValueToCurrentRow(aggregationUnitMap);
        return result;
    }

其中涉及的源码实现有:

public final class AggregationUnitFactory {
    
    /**
     * Create aggregation unit instance.
     * 
     * @param type aggregation function type
     * @return aggregation unit instance
     */
    public static AggregationUnit create(final AggregationType type) {
        switch (type) {
            case MAX:
                return new ComparableAggregationUnit(false);
            case MIN:
                return new ComparableAggregationUnit(true);
            case SUM:
            case COUNT:
                return new AccumulationAggregationUnit();
            case AVG:
                return new AverageAggregationUnit();
            default:
                throw new UnsupportedOperationException(type.name());
        }
    }
}

上面可知select中MAX和MIN这种聚合查询需要使用ComparableAggregationUnit,SUM和COUNT需要使用AccumulationAggregationUnit,AVG需要使用AverageAggregationUnit;(目前shardingjdbc只支持这些聚合操作).

aggregate()源码如下:

private void aggregate(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) throws SQLException {
    for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
        List<Comparable<?>> values = new ArrayList<>(2);
        if (entry.getKey().getDerivedAggregationSelectItems().isEmpty()) {
            values.add(getAggregationValue(entry.getKey()));
        } else {
            for (AggregationSelectItem each : entry.getKey().getDerivedAggregationSelectItems()) {
                values.add(getAggregationValue(each));
            }
        }
        // aggregate()的核心就是调用AggregationUnit具体实现中的merge()方法,即调用AccumulationAggregationUnit.merge()方法(后面会对AggregationUnit的各个实现进行分析)
        entry.getValue().merge(values);
    }
}

再来看下具体的执行流程:

*

好吧。看这块还是不如价格例子清晰:下面例子来自阿飞Javaer

step1. SQL执行
首先在两个实际表t_order_0t_order_1中分别执行SQL:SELECT o.status, count(o.user_id) FROM t_order o where o.user_id=10 group by o.statust_order_0t_order_1分别得到如下的结果:

status count(o.user_id)
INIT 3
NEW 1
VALID 1
status count(o.user_id)
INIT 2
NEW 2
VALID 1

step2. 执行super(***)
即在GroupByStreamResultSetMerger中调用OrderByStreamResultSetMerger的构造方法super(resultSets, selectStatement.getOrderByItems());,从而得到优先级队列,如下图所示的第一张图,优先级中包含两个元素[(INIT, 3), (INIT 2)]:

1、 先聚合计算(INIT,3)和(INIT,2),由于NEW和INIT不相等,进行下一轮聚合计算;
2、 再聚合计算(NEW,1)和(NEW,2),由于VALID和NEW不相等,进行下一轮聚合计算;
3、 再聚合计算(VALID,1)和(VALID,1),两者的next()为false,聚合计算完成;

step3. aggregationUnitMap
通过转换得到aggregationUnitMap,key就是count(user_id),value就是COUNT聚合计算的AggregationUnit实现,即AccumulationAggregationUnit;

step4 . 循环遍历并merge
核心代码如下,即将(INIT, 3)和(INIT, 2)通过调用AccumulationAggregationUnit中的merge方法,从而得到(INIT, 5)。同样的原因调用AccumulationAggregationUnit中的merge方法merge(NEW, 1)和(NEW, 2),从而得到(NEW, 3);merge(VALID, 1)和(VALID, 1),从而得到(VALID, 2)。所以,最终的结果就是[(INIT, 5), (NEW, 3), (VALID, 2)]

4.2 AggregationUnit

AggregationUnit,归并计算单元接口,有两个接口方法:

  • #merge():归并聚合值
  • #getResult():获取计算结果

一共有三个实现类:

5. GroupByMemoryResultSetMerger

GroupByMemoryResultSetMerger,基于 内存 分组归并结果集实现。

我们通过上篇 结果合并可以知道。如果要走GroupByMemoryResultSetMerger,那么需要这样的SQL:SELECT o.status, count(o.user_id) count_user_id FROM t_order o where o.user_id=10 group by o.status order by count_user_id asc,即group by和order by的字段不一样。

区别于GroupByStreamResultSetMerger,其无法使用每个分片结果集的有序的特点,只能在内存中合并后,进行整个重新排序。因而,性能和内存都较 GroupByStreamResultSetMerger 会差。

举个例子:

在实际表t_order_0上执行SQL返回的结果如下:

status count_user_id
NEW 1
VALID 1
INIT 3

在实际表t_order_1上执行SQL返回的结果如下:

status count_user_id
VALID 1
INIT 2
NEW 2

GroupByMemoryResultSetMerger的构造方法源码如下:

private Iterator<MemoryResultSetRow> init(final List<ResultSet> resultSets) throws SQLException {
        // 分组条件值与内存记录映射
        Map<GroupByValue, MemoryResultSetRow> dataMap = new HashMap<>(1024);
         // 分组条件值与聚合列映射
        Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap = new HashMap<>(1024);
        // 遍历结果集
        for (ResultSet each : resultSets) {
            while (each.next()) {
                // 生成分组条件:selectStatement.getGroupByItems()即group by项,将结果和group by项组成一个GroupByValue对象--实际是从ResultSet中取出group by项的值
                GroupByValue groupByValue = new GroupByValue(each, selectStatement.getGroupByItems());
                // 初始化分组条件到 dataMap、aggregationMap 映射
                initForFirstGroupByValue(each, groupByValue, dataMap, aggregationMap);
                 // 归并聚合值
                aggregate(each, groupByValue, aggregationMap);
            }
        }
          // 设置聚合列结果到内存记录datamap
        setAggregationValueToMemoryRow(dataMap, aggregationMap);
          // 设置聚合列结果到内存记录(将结果转换成List<MemoryResultSetRow>形式)
        List<MemoryResultSetRow> result = getMemoryResultSetRows(dataMap);
        if (!result.isEmpty()) {
             // 如果有结果,再将currentResultSetRow置为List<MemoryResultSetRow>的第一个元素
            setCurrentResultSetRow(result.get(0));
        }
        //返回List<MemoryResultSetRow>的迭代器,目的为了迭代这个集合
        return result.iterator();
    }

#initForFirstGroupByValue() 初始化分组条件到 dataMap,aggregationMap 映射中,这样可以调用 #aggregate() 将聚合值归并到 aggregationMap 里的该分组条件。

 private void initForFirstGroupByValue(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, MemoryResultSetRow> dataMap, 
                                          final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException {
         //初始化分组条件到 dataMap:
         //groupByValue如果是第一次出现,那么在dataMap中初始化一条数据,key就是groupByValue,例如NEW;
         //value就是new MemoryResultSetRow(resultSet),即将ResultSet中的结果取出来封装到MemoryResultSetRow中,
         //MemoryResultSetRow实际就一个属性Object[] data,那么data值就是这样的["NEW", 1]   
        if (!dataMap.containsKey(groupByValue)) {
            dataMap.put(groupByValue, new MemoryResultSetRow(resultSet));
        }
        //初始化分组条件到 aggregationMap:
        //groupByValue如果是第一次出现,那么在aggregationMap中初始化一条数据,key就是groupByValue,例如NEW;
        //value又是一个map,这个map的key就是select中有聚合计算的列,例如count(user_id),即count_user_id;value就是AggregationUnit的实现,count聚合计算的实现是AccumulationAggregationUnit
        if (!aggregationMap.containsKey(groupByValue)) {
            Map<AggregationSelectItem, AggregationUnit> map = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {
                
                @Override
                public AggregationUnit apply(final AggregationSelectItem input) {
                   // 根据聚合计算类型得到AggregationUnit的实现
                    return AggregationUnitFactory.create(input.getType());
                }
            });
            aggregationMap.put(groupByValue, map);
        }
    }

这是为聚合做准备,看看aggregate()的实现

private void aggregate(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException {
    // 遍历select中所有的聚合类型,例如COUNT(o.user_id)
    for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) {
        List<Comparable<?>> values = new ArrayList<>(2);
        if (each.getDerivedAggregationSelectItems().isEmpty()) {
            values.add(getAggregationValue(resultSet, each));
        } else {
            for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) {
                values.add(getAggregationValue(resultSet, derived));
            }
        }
        // 通过AggregationUnit实现类即AccumulationAggregationUnit进行聚合,实际上就是聚合本次遍历到的ResultSet,聚合的临时结果就在AccumulationAggregationUnit的属性result中(AccumulationAggregationUnit聚合的本质就是累加)
        aggregationMap.get(groupByValue).get(each).merge(values);
    }
}

*

再来结合例子看看上面的流程。

经过for (ResultSet each : resultSets) { while (each.next()) { ... 遍历所有结果并聚合计算后,aggregationMap这个map中已经聚合计算完后的结果,如下所示:

{
    "VALID": {
        "COUNT(user_id)": 2
    },
    "INIT": {
        "COUNT(user_id)": 5
    },
    "NEW": {
        "COUNT(user_id)": 3
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

再将aggregationMap中的结果封装到Map<GroupByValue, MemoryResultSetRow> dataMap这个map中,结果形式如下所示:

{
    "VALID": ["VALID", 2],
    "INIT": ["INIT", 5],
    "NEW": ["NEW", 3]
}
  • 1
  • 2
  • 3
  • 4
  • 5

MemoryResultSetRow的本质就是一个Object[] data,所以其值是[“VALID”, 2],[“INIT”, 5]这种形式

*

*

5.1 next()

public boolean next() throws SQLException {
        if (memoryResultSetRows.hasNext()) {
            setCurrentResultSetRow(memoryResultSetRows.next());
            return true;
        }
        return false;
    }

内存归并完成后,使用 memoryResultSetRows 不断获得下一条记录

总结:GroupByMemoryResultSetMerger的名字一样,其实现原理是把所有结果加载到内存中,在内存中进行计算,耗内存如果SQL返回的总结果数比较多存在内存溢出可能,性能差一些。

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: