10、ShardingJDBC实战:最大努力型事务

一 序:

Sharding-JDBC由于性能方面的考量,决定不支持强一致性分布式事务。目前支持的:

  • Best efforts delivery transaction (已经实现).
  • Try confirm cancel transaction (待定).

最大努力送达型:在分布式数据库的场景下,相信对于该数据库的操作最终一定可以成功,所以通过最大努力反复尝试送达操作。

*

实际上可以看看图上的流程,不管执行结果如何,执行前事件都会记录事务日志;执行事件类型包括3种:BEFORE_EXECUTE,EXECUTE_FAILURE和EXECUTE_SUCCESS;另外,这里的”同步“不是绝对的同步执行,而是通过google-guava的EventBus发布事件后,在监听端判断是EXECUTE_FAILURE事件,最多重试syncMaxDeliveryTryTimes次;

适用场景

  • 根据主键删除数据。
  • 更新记录永久状态,如更新通知送达状态。

使用限制

使用最大努力送达型柔性事务的SQL需要满足幂等性。

  • INSERT语句要求必须包含主键,且不能是自增主键。
  • UPDATE语句要求幂等,不能是UPDATE xxx SET x=x+1
  • DELETE语句无要求。

这里看看官网demo:http://shardingjdbc.io/document/legacy/2.x/cn/02-guide/transaction/

整个过程通过如下 组件 完成:
柔性事务管理器
最大努力送达型柔性事务
最大努力送达型事务监听器
事务日志存储器

最大努力送达型异步作业

二 柔性事务管理器

之前的《 SQL执行》对ExecutorEngine的分析可知,sharding-jdbc在执行SQL前后,分别调用EventBusInstance.getInstance().post()提交了事件,那么调用EventBusInstance.getInstance().register()的地方,就是柔性事务处理的地方,通过查看源码的调用关系可知,只有SoftTransactionManager.init()调用了EventBusInstance.getInstance().register(),所以柔性事务实现的核心在SoftTransactionManager这里;

2.1 SoftTransactionManager

柔性事务管理器,SoftTransactionManager 实现,负责对柔性事务配置( SoftTransactionConfiguration ) 、柔性事务( AbstractSoftTransaction )的管理。

public final class SoftTransactionManager {

    private static final String TRANSACTION = "transaction";

    private static final String TRANSACTION_CONFIG = "transactionConfig";

    @Getter
    private final SoftTransactionConfiguration transactionConfig;
    
    /**
     * Initialize B.A.S.E transaction manager.
     * 
     * @throws SQLException SQL exception
     */
    public void init() throws SQLException {
        // 初始化 最大努力送达型事务监听器
        EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
        // 初始化 事务日志数据库存储表
        if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
            Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource());
            createTable();
        }
         // 初始化 内嵌的最大努力送达型异步作业
        if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {
            new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();
        }
    }

通过上面的代码,可知这里的涉及的重点如下:

将最大努力送达型事务监听器( BestEffortsDeliveryListener )注册到事务总线 ( EventBus ),
当使用数据库存储事务日志( TransactionLog ) 时,若事务日志表( transaction_log )不存在则进行创建.

当配置使用内嵌的最大努力送达型异步作业( NestedBestEffortsDeliveryJob ) 时,进行初始化

2.2 SoftTransactionConfiguration

柔性事务配置对象

public class SoftTransactionConfiguration {
    
    /**
     * Data source for transaction manager.
     */
    @Getter(AccessLevel.NONE)
    private final DataSource targetDataSource;
    
    /**
     * Max synchronized delivery try times.
     */
    private int syncMaxDeliveryTryTimes = 3;
    
    /**
     * Transaction log storage type.
     */
    private TransactionLogDataSourceType storageType = RDB;
    
    /**
     * Transaction log data source.
     */
    private DataSource transactionLogDataSource;
    
    /**
     * Embed best efforts delivery B.A.S.E transaction asynchronized job configuration.
     */
    private Optional<NestedBestEffortsDeliveryJobConfiguration> bestEffortsDeliveryJobConfiguration = Optional.absent();

2.3 柔性事务

在Sharding-JDBC 里,目前柔性事务分成两种:

BEDSoftTransaction :最大努力送达型柔性事务
TCCSoftTransaction :TCC型柔性事务
继承AbstractSoftTransaction

public abstract class AbstractSoftTransaction {
    
    private boolean previousAutoCommit;
    
    @Getter
    private ShardingConnection connection;
    
    @Getter
    private SoftTransactionType transactionType;
    
    @Getter
    private String transactionId;

提供了开始事务beginInternal,结束事务end供子类调用。

protected final void beginInternal(final Connection conn, final SoftTransactionType type) throws SQLException {
        // TODO 判断如果在传统事务中,则抛异常
        Preconditions.checkArgument(conn instanceof ShardingConnection, "Only ShardingConnection can support eventual consistency transaction.");
        //   设置执行错误,不抛出异常
        ExecutorExceptionHandler.setExceptionThrown(false);
        connection = (ShardingConnection) conn;
        transactionType = type;
        //设置自动提交状态
        previousAutoCommit = connection.getAutoCommit();
        connection.setAutoCommit(true);
        // TODO replace to snowflake:以后用snowflake生成事务编号替换uuid
        transactionId = UUID.randomUUID().toString();
    }

注意点:1 。SQL异常不抛出,会继续执行。

2、 自动提交,所以不支持回滚;

	public final void end() throws SQLException {
        if (null != connection) {
            ExecutorExceptionHandler.setExceptionThrown(true);
            connection.setAutoCommit(previousAutoCommit);
            SoftTransactionManager.closeCurrentTransactionManager();
        }
    }
     /**
     * Close transaction manager from current thread.
     */
    static void closeCurrentTransactionManager() {
        ExecutorDataMap.getDataMap().put(TRANSACTION, null);
        ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, null);
    }

end作用是事务结束后清理线程变量。

BEDSoftTransaction

public class BEDSoftTransaction extends AbstractSoftTransaction {
    
    /**
     * Begin transaction.
     * 
     * @param connection database connection
     * @throws SQLException SQL exception
     */
    public void begin(final Connection connection) throws SQLException {
        beginInternal(connection, SoftTransactionType.BestEffortsDelivery);
    }
}

2.4 创建柔性事务

通过调用 SoftTransactionManager.getTransaction() 创建柔性事务对象:

public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {
        AbstractSoftTransaction result;
        switch (type) {
            case BestEffortsDelivery: 
                result = new BEDSoftTransaction();
                break;
            case TryConfirmCancel:
                result = new TCCSoftTransaction();
                break;
            default: 
                throw new UnsupportedOperationException(type.toString());
        }
        // TODO don't support nested transaction(嵌套事务), should configurable in future
        if (getCurrentTransaction().isPresent()) {
            throw new UnsupportedOperationException("Cannot support nested transaction.");
        }
        ExecutorDataMap.getDataMap().put(TRANSACTION, result);
        ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);
        return result;
    }

获取柔性事务配置:

public static Optional<SoftTransactionConfiguration> getCurrentTransactionConfiguration() {
        Object transactionConfig = ExecutorDataMap.getDataMap().get(TRANSACTION_CONFIG);
        return (null == transactionConfig)
                ? Optional.<SoftTransactionConfiguration>absent()
                : Optional.of((SoftTransactionConfiguration) transactionConfig);
    }
public static Optional<AbstractSoftTransaction> getCurrentTransaction() {
        Object transaction = ExecutorDataMap.getDataMap().get(TRANSACTION);
        return (null == transaction)
                ? Optional.<AbstractSoftTransaction>absent()
                : Optional.of((AbstractSoftTransaction) transaction);
    }

3 事务日志存储器

柔性事务执行过程中,会通过事务日志( TransactionLog ) 记录每条 SQL 执行状态:
SQL执行前,记录一条事务日志
SQL执行成功,移除对应的事务日志
通过实现事务日志存储器接口( TransactionLogStorage ),提供存储功能。有两个实现类:
1、 RdbTransactionLogStorage:关系型数据库存储柔性事务日志;
2、 MemoryTransactionLogStorage:内存存储柔性事务日志;

3.1 TransactionLogStorage

public interface TransactionLogStorage {
    
    /**
     * Save transaction log.
     * 
     * @param transactionLog transaction log
     */
    void add(TransactionLog transactionLog);
    
    /**
     * Remove transaction log.
     * 
     * @param id transaction log id
     */
    void remove(String id);
    
    /**
     * Find eligible transaction logs.
     * 
     * <p>To be processed transaction logs: </p>
     * <p>1. retry times less than max retry times.</p>
     * <p>2. transaction log last retry timestamp interval early than last retry timestamp.</p>
     * 
     * @param size size of fetch transaction log
     * @param maxDeliveryTryTimes max delivery try times
     * @param maxDeliveryTryDelayMillis max delivery try delay millis
     * @return eligible transaction logs
     */
    List<TransactionLog> findEligibleTransactionLogs(int size, int maxDeliveryTryTimes, long maxDeliveryTryDelayMillis);
    
    /**
     * Increase asynchronized delivery try times.
     * 
     * @param id transaction log id
     */
    void increaseAsyncDeliveryTryTimes(String id);
    
    /**
     * Process transaction logs.
     *
     * @param connection connection for business app
     * @param transactionLog transaction log
     * @param maxDeliveryTryTimes max delivery try times
     * @return process success or not
     */
    boolean processData(Connection connection, TransactionLog transactionLog, int maxDeliveryTryTimes);
}

注释的比较清晰了,翻一下:

TransactionLogStorage中几个重要接口在两个实现类中的实现:
* void add(TransactionLog):Rdb实现就是把事务日志TransactionLog 插入到transaction_log表中,Memory实现就是把事务日志保存到ConcurrentHashMap中;
* void remove(String id):Rdb实现就是从transaction_log表中删除事务日志,Memory实现从ConcurrentHashMap中删除事务日志;
* void increaseAsyncDeliveryTryTimes(String id):异步增加送达重试次数,即TransactionLog中的asyncDeliveryTryTimes+1;Rdb实现就是update transaction_log表中async_delivery_try_times字段加1;Memory实现就是TransactionLog中重新给asyncDeliveryTryTimes赋值new AtomicInteger(transactionLog.getAsyncDeliveryTryTimes()).incrementAndGet();
* findEligibleTransactionLogs(): 查询需要处理的事务日志,条件是:①异步处理次数async_delivery_try_times小于参数最大处里次数maxDeliveryTryTimes,②transaction_type是BestEffortsDelivery,③系统当前时间与事务日志的创建时间差要超过参数maxDeliveryTryDelayMillis,每次最多查询参数size条;Rdb实现通过sql从transaction_log表中查询,Memory实现遍历ConcurrentHashMap匹配符合条件的TransactionLog;
* boolean processData():Rdb实现执行TransactionLog中的sql,如果执行过程中抛出异常,那么调用increaseAsyncDeliveryTryTimes()增加送达重试次数并抛出异常,如果执行成功,删除事务日志,并返回true;Memory实现直接返回false(因为processData()的目的是执行TransactionLog中的sql,而Memory类型无法触及数据库,所以返回false)

3.2 RdbTransactionLogStorage 接口实现源码

public final class RdbTransactionLogStorage implements TransactionLogStorage {
    
    private final DataSource dataSource;
    
    @Override
    public void add(final TransactionLog transactionLog) {
        String sql = "INSERT INTO transaction_log (id, transaction_type, data_source, sql, parameters, creation_time) VALUES (?, ?, ?, ?, ?, ?);";
        try (...
    }
        
    public void remove(final String id) {
      String sql = "DELETE FROM transaction_log WHERE id=?;";
      ...
    }
    public List<TransactionLog> findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) {
    //最多去size条
    List<TransactionLog> result = new ArrayList<>(size);
    String sql = "SELECT id, transaction_type, data_source, sql, parameters, creation_time, async_delivery_try_times "
        + "FROM transaction_log WHERE async_delivery_try_times`<? AND transaction_type=? AND creation_time`<? LIMIT ?;";
    try (Connection conn = dataSource.getConnection()) {  
    。。。。
    }
    public void increaseAsyncDeliveryTryTimes(final String id) {
    // 更新处理次数+1
    String sql = "UPDATE transaction_log SET async_delivery_try_times=async_delivery_try_times+1 WHERE id=?;";
    ...
    }
    public boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) {
        try (
            Connection conn = connection;
            // 重试执行TransactionLog中的sql
            PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) {
            for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) {
                preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex));
            }
            preparedStatement.executeUpdate();
        } catch (final SQLException ex) {
            //如果抛出异常,表示执行sql失败,那么把增加处理次数并把异常抛出去;
            increaseAsyncDeliveryTryTimes(transactionLog.getId());
            throw new TransactionCompensationException(ex);
        }
        // 如果没有抛出异常,表示执行sql成功,那么删除该事务日志;
        remove(transactionLog.getId());
        return true;
    }
  • 该方法会被最大努力送达型异步作业调用到

TransactionLog (transaction_log) 数据库表结构如下:

字段 名字 数据库类型 备注
id 事件编号 VARCHAR(40) EventBus 事件编号,非事务编号
transaction_type 柔性事务类型 VARCHAR(30)
data_source 真实数据源名 VARCHAR(255)  
sql 执行 SQL TEXT 已经改写过的 SQL
parameters 占位符参数 TEXT JSON 字符串存储
creation_time 记录时间 LONG
async_delivery_try_times 已异步重试次数 INT

4. 最大努力送达型事务监听器

最大努力送达型事务监听器,BestEffortsDeliveryListener,负责记录事务日志、同步重试执行失败 SQL。

public final class BestEffortsDeliveryListener {
    
    @Subscribe
    @AllowConcurrentEvents
    //从方法可知,只监听DML执行事件
    public void listen(final DMLExecutionEvent event) {
        //判断是否需要继续,判断逻辑为:事务存在,并且是BestEffortsDelivery类型事务
        if (!isProcessContinuously()) {
            return;
        }
        // 从柔性事务管理器中得到柔性事务配置
        SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
        //得到配置的柔性事务存储器
        TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
        //得到最大努力送达型事务
        BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
          // 根据事件类型做不同处理
        switch (event.getEventExecutionType()) {
              // 如果执行前事件,那么先保存事务日志;
            case BEFORE_EXECUTE:
                //TODO for batch SQL need split to 2-level records
                transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), 
                        event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
                return;
                 // 如果执行成功事件,那么删除事务日志;
            case EXECUTE_SUCCESS: 
                transactionLogStorage.remove(event.getId());
                return;
                //执行失败,同步重试
            case EXECUTE_FAILURE: 
                boolean deliverySuccess = false;
                for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
                    //如果执行成功,那么返回,不需要再尝试
                    if (deliverySuccess) {
                        return;
                    }
                    boolean isNewConnection = false;
                    Connection conn = null;
                    PreparedStatement preparedStatement = null;
                    try {
                         // 获得数据库连接
                        conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
                        // 通过执行"select 1"判断conn是否是有效的数据库连接;如果不是有效的数据库连接,释放掉并重新获取一个数据库连接;
                        // 为啥呢?因为可能执行失败是数据库连接异常,所以再判断一次
                        if (!isValidConnection(conn)) {
                            bedSoftTransaction.getConnection().release(conn);
                            conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
                            isNewConnection = true;
                        }
                        preparedStatement = conn.prepareStatement(event.getSql());
                        //同步重试,
                        //TODO for batch event need split to 2-level records(对于批量事件需要解析成两层列表)
                        for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
                            preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
                        }
                        //因为只监控DML,所以调用executeUpdate()
                        preparedStatement.executeUpdate();
                        deliverySuccess = true;
                        //执行成功;根据id删除事务日志;
                        transactionLogStorage.remove(event.getId());
                    } catch (final SQLException ex) {
                        // 如果sql执行有异常,那么输出error日志
                        log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
                    } finally {
                    // 关闭链接
                        close(isNewConnection, conn, preparedStatement);
                    }
                }
                return;
            default: 
               // 默认对于支持三种事件类型之外的,抛出异常
                throw new UnsupportedOperationException(event.getEventExecutionType().toString());
        }
    }
    private boolean isProcessContinuously() {
        return SoftTransactionManager.getCurrentTransaction().isPresent()
                && SoftTransactionType.BestEffortsDelivery == SoftTransactionManager.getCurrentTransaction().get().getTransactionType();
    }

BestEffortsDeliveryListener 通过 EventBus 实现监听 SQL 的执行。Sharding-JDBC 而是通过google-guava的EventBus发布事件。具体可以结合《SQL执行》来看。
QL执行前,插入事务日志
SQL执行成功,移除事务日志

SQL执行失败,根据柔性事务配置( SoftTransactionConfiguration )同步的事务送达的最大尝试次数( syncMaxDeliveryTryTimes )进行多次重试直到成功。

最后进行关闭链接close()

private void close(final boolean isNewConnection, final Connection conn, final PreparedStatement preparedStatement) {
        if (null != preparedStatement) {
            try {
                preparedStatement.close();
            } catch (final SQLException ex) {
                log.error("PreparedStatement closed error:", ex);
            }
        }
        if (isNewConnection && null != conn) {
            try {
                conn.close();
            } catch (final SQLException ex) {
                log.error("Connection closed error:", ex);
            }
        }
    }

5 最大努力送达型异步作业

当最大努力送达型事务监听器( BestEffortsDeliveryListener )多次同步重试失败后,交给最大努力送达型异步作业进行多次异步重试,并且多次执行有固定间隔。
Sharding-JDBC 提供了两个最大努力送达型异步作业实现:
NestedBestEffortsDeliveryJob :内嵌的最大努力送达型异步作业
BestEffortsDeliveryJob :最大努力送达型异步作业

逻辑类似,只是前者无法实现高可用,可以在测试环境用。

5.1 BestEffortsDeliveryJob

核心源码在模块sharding-jdbc-transaction-async-job中。该模块是一个独立异步处理模块,使用者决定是否需要启用,源码比较少。宝结构如下图所示:

*

Main方法的核心源码如下:

public static void main(final String[] args) throws Exception {
    // CHECKSTYLE:ON
        try (InputStreamReader inputStreamReader = new InputStreamReader(BestEffortsDeliveryJobMain.class.getResourceAsStream("/conf/config.yaml"), "UTF-8")) {
            BestEffortsDeliveryConfiguration config = new Yaml(new Constructor(BestEffortsDeliveryConfiguration.class)).loadAs(inputStreamReader, BestEffortsDeliveryConfiguration.class);
            new BestEffortsDeliveryJobFactory(config).init();
        }
    }

由源码可知,主配置文件是config.yaml;将该文件解析为BestEffortsDeliveryConfiguration,然后调用job工厂的配置初始化

config.yaml配置文件

#事务日志的数据源.
targetDataSource:
  ds_0: !!org.apache.commons.dbcp.BasicDataSource
    driverClassName: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/ds_0
    username: root
    password:
  ds_1: !!org.apache.commons.dbcp.BasicDataSource
    driverClassName: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/ds_1
    username: root
    password:

#事务日志的数据源.
transactionLogDataSource:
  ds_trans: !!org.apache.commons.dbcp.BasicDataSource
    driverClassName: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/trans_log
    username: root
    password:

zkConfig:
  #注册中心的连接地址
  connectionString: localhost:2181
  
  #作业的命名空间
  namespace: Best-Efforts-Delivery-Job
  
  #注册中心的等待重试的间隔时间的初始值
  baseSleepTimeMilliseconds: 1000
  
  #注册中心的等待重试的间隔时间的最大值
  maxSleepTimeMilliseconds: 3000
  
  #注册中心的最大重试次数
  maxRetries: 3

jobConfig:
  #作业名称
  name: bestEffortsDeliveryJob
  
  #触发作业的cron表达式
  cron: 0/5 * * * * ?
  
  #每次作业获取的事务日志最大数量
  transactionLogFetchDataCount: 100
  
  #事务送达的最大尝试次数.
  maxDeliveryTryTimes: 3
  
  #执行送达事务的延迟毫秒数,早于此间隔时间的入库事务才会被作业执行
  maxDeliveryTryDelayMillis: 60000

BestEffortsDeliveryJobFactory核心源码:

public final class BestEffortsDeliveryJobFactory {
    //这个属性赋值通过有参构造方法进行赋值,是通过config.yaml配置的属性
    private final BestEffortsDeliveryConfiguration bedConfig;
    
    /**
     * Main中调用该init()方法,
     * Initialize best efforts delivery job.
     */
    public void init() {
        //根据config.yaml中配置的zkConfig节点,得到协调调度中心CoordinatorRegistryCenter
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(createZookeeperConfiguration(bedConfig));
        // 调度中心初始化
        regCenter.init();
        //构造elastic-job调度任务
        JobScheduler jobScheduler = new JobScheduler(regCenter, createBedJobConfiguration(bedConfig));
        jobScheduler.setField("bedConfig", bedConfig);
        jobScheduler.setField("transactionLogStorage", TransactionLogStorageFactory.createTransactionLogStorage(new RdbTransactionLogDataSource(bedConfig.getDefaultTransactionLogDataSource())));
        jobScheduler.init();
    }
    
    public ZookeeperConfiguration createZookeeperConfiguration(final BestEffortsDeliveryConfiguration bedConfig) {
        AsyncSoftTransactionZookeeperConfiguration zkConfig = bedConfig.getZkConfig();
        return new ZookeeperConfiguration(zkConfig.getConnectionString(), zkConfig.getNamespace(), zkConfig.getBaseSleepTimeMilliseconds(),
            zkConfig.getMaxSleepTimeMilliseconds(), zkConfig.getMaxRetries());
    }
    //创建BestEffortsDeliveryJob配置
    private JobConfiguration createBedJobConfiguration(final BestEffortsDeliveryConfiguration bedJobConfig) {
        // 根据config.yaml中配置的jobConfig节点得到job配置信息,且指定job类型为BestEffortsDeliveryJob
        JobConfiguration result = new JobConfiguration(bedJobConfig.getJobConfig().getName(), BestEffortsDeliveryJob.class, 1, bedJobConfig.getJobConfig().getCron());
        result.setFetchDataCount(bedJobConfig.getJobConfig().getTransactionLogFetchDataCount());
        result.setOverwrite(true);
        return result;
    }
}

JOB的代码

public class BestEffortsDeliveryJob extends AbstractIndividualThroughputDataFlowElasticJob<TransactionLog> {
    
    @Setter
    private BestEffortsDeliveryConfiguration bedConfig;
    
    @Setter
    private TransactionLogStorage transactionLogStorage;
    
    @Override
    public List<TransactionLog> fetchData(final JobExecutionMultipleShardingContext context) {
        return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(), 
            bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis());
    }
    
    @Override
    public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) {
        try (
            Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()) {
            transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes());
        } catch (final SQLException | TransactionCompensationException ex) {
            log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes() + 1, 
                bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));
            return false;
        }
        return true;
    }
    
    @Override
    public boolean isStreamingProcess() {
        return false;
    }
}

调用 #fetchData() 方法获取需要处理的事务日志 (TransactionLog),内部调用了 TransactionLogStorage#findEligibleTransactionLogs() 方法
调用 #processData() 方法处理事务日志,重试执行失败的 SQL,内部调用了 TransactionLogStorage#processData()
#fetchData() 和 #processData() 调用是 Elastic-Job 控制的。每一轮定时调度,每条事务日志只执行一次。当超过最大异步调用次数后,该条事务日志不再处理,所以生产使用时,最好增加下相应监控超过最大异步重试次数的事务日志。

参考:http://www.iocoder.cn/Sharding-JDBC/transaction-bed/

https://www.jianshu.com/p/0f1a938c9017

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: