美文网首页
TX-LCN 核心流程源码分析

TX-LCN 核心流程源码分析

作者: 丑人林宗己 | 来源:发表于2019-06-07 14:37 被阅读0次

TX-LCN核心源码解读

TX-LCN是基于Java编写的分布式事务解决方案框架,主要提供三种主流的解决方案

  • LCN模式,通过代理JDBC Connection来控制协调多组本地原子事务的提交与关闭
  • TCC模式,属于框架级别解决方案,对业务入侵性极大
  • TXC模式,核心方案为查询 + 分布式锁的分布式事务解决方案,由淘宝团队提出

核心组件

  • TC,作为分布式事务组件的客户端角色,主要作用在于治理本地事务
  • TM,作为分布式事务组件的服务端角色,主要作用在于协调事务组

核心概念

  • 事务组,group,描述整个分布式环境下运行的各个事务(以一个request所需要完成的事务)组合而成的一组事务。
  • 事务单元,unit,描述一个事务组内除开主事务之外的从事务,一个从事务表示一个事务单元。

核心流程

事务核心流程.png

核心源码

LCN模式作为实践方案,源码也以LCN 作为解读对象。LCN 模式是什么模式?

LCN模式是通过代理Connection的方式实现对本地事务的操作,然后在由TxManager统一协调控制事务。当本地事务提交回滚或者关闭连接时将会执行假操作,该代理的连接将由LCN连接池管理。

创建事务组

/**
 * Client创建事务组操作集合
 *
 * @param groupId         groupId
 * @param unitId          unitId
 * @param transactionInfo transactionInfo
 * @param transactionType transactionType
 * @throws TransactionException 创建group失败时抛出
 */
public void createGroup(String groupId, String unitId, TransactionInfo transactionInfo, String transactionType)
    throws TransactionException {
    //创建事务组
    try {
        // 日志
        txLogger.transactionInfo(groupId, unitId,
                                 "create group > {} > groupId: {xid}, unitId: {uid}", transactionType);
        // 创建事务组消息
        reliableMessenger.createGroup(groupId);
        // 缓存发起方切面信息
        aspectLogger.trace(groupId, unitId, transactionInfo);
    } catch (RpcException e) {
        // 通讯异常
        dtxExceptionHandler.handleCreateGroupMessageException(groupId, e);
    } catch (LcnBusinessException e) {
        // 创建事务组业务失败
        dtxExceptionHandler.handleCreateGroupBusinessException(groupId, e.getCause());
    }
    txLogger.transactionInfo(groupId, unitId, "create group over");
}

TC 发起创建事务组仅仅是像服务端发起请求,参数为groupId ,核心的代码在于TM 接到请求后的处理。


public class CreateGroupExecuteService implements RpcExecuteService {

    // ...
    @Override
    public Serializable execute(TransactionCmd transactionCmd) throws TxManagerException {
        // ...
        transactionManager.begin(transactionCmd.getGroupId());
    }
}

public class DefaultDTXContextRegistry implements DTXContextRegistry {

    private final FastStorage fastStorage;

    @Override
    public DTXContext create(String groupId) throws TransactionException {
        // ..
        fastStorage.initGroup(groupId);
    }
}


public class RedisStorage implements FastStorage {

    // ...
    @Override
    public void initGroup(String groupId) {
        // 将groupId存入Redis
        redisTemplate.opsForHash().put(REDIS_GROUP_PREFIX + groupId, "root", "");
        redisTemplate.expire(REDIS_GROUP_PREFIX + groupId, managerConfig.getDtxTime() + 10000, TimeUnit.MILLISECONDS);
    }
}

加入事务组


/**
 * Client加入事务组操作集合
 *
 * @param groupId         groupId
 * @param unitId          unitId
 * @param transactionType transactionType
 * @param transactionInfo transactionInfo
 * @throws TransactionException 加入事务组失败时抛出
 */
public void joinGroup(String groupId, String unitId, String transactionType, TransactionInfo transactionInfo) throws TransactionException {
    
    // 询问TM加入事务组
    // 该groupId由远程RPC通过header方式携带到从事务
    reliableMessenger.joinGroup(groupId, unitId, transactionType, DTXLocalContext.transactionState());

    // 异步检测
    dtxChecking.startDelayCheckingAsync(groupId, unitId, transactionType);
    
    // ...
}

// 
@Override
public void startDelayCheckingAsync(String groupId, String unitId, String transactionType) {
    txLogger.taskInfo(groupId, unitId, "start delay checking task");
    // 异步阻塞的方式
    ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(() -> {
        try {
            TxContext txContext = globalContext.txContext(groupId);
            if (Objects.nonNull(txContext)) {
                synchronized (txContext.getLock()) {
                    txLogger.info(groupId, unitId, Transactions.TAG_TASK,
                            "checking waiting for business code finish.");
                    txContext.getLock().wait();
                }
            }
            int state = reliableMessenger.askTransactionState(groupId, unitId);// 询问事务组是否成功
            txLogger.taskInfo(groupId, unitId, "ask transaction state {}", state);
            if (state == -1) {
                txLogger.error(this.getClass().getSimpleName(), "delay clean transaction error.");
                onAskTransactionStateException(groupId, unitId, transactionType);
            } else {
                transactionCleanTemplate.clean(groupId, unitId, transactionType, state);// 事务清理,即commit or rollback
                aspectLogger.clearLog(groupId, unitId);
            }

        } catch (RpcException e) {
            onAskTransactionStateException(groupId, unitId, transactionType);
        } catch (TransactionClearException | InterruptedException e) {
            txLogger.error(this.getClass().getSimpleName(), "{} clean transaction error.", transactionType);
        }
    }, clientConfig.getDtxTime(), TimeUnit.MILLISECONDS);// 时间为最大事务时间,该时间由TM配置,在TC初始化时从TM放取到
    delayTasks.put(groupId + unitId, scheduledFuture);
}

//  询问事务是否成功
private void onAskTransactionStateException(String groupId, String unitId, String transactionType) {
    try {
        // 通知TxManager事务补偿
        txMangerReporter.reportTransactionState(groupId, unitId, TxExceptionParams.ASK_ERROR, 0);
        log.warn("{} > has compensation info!", transactionType);

        // 事务回滚, 保留适当的补偿信息
        transactionCleanTemplate.compensationClean(groupId, unitId, transactionType, 0);
    } catch (TransactionClearException e) {
        log.error("{} > clean transaction error.", transactionType);
    }
}

TC 核心工作是申请加入事务组,并启动异步任务在事务最大时间后访问TM 事务组的全局事务状态来进行事务协调。

创建异步任务对象并将其缓存在本地内存的delayTasks, 如果在事务最大时间内已经完成并调用则将该任务取消。

@Override
public void stopDelayChecking(String groupId, String unitId) {
    ScheduledFuture scheduledFuture = delayTasks.get(groupId + unitId);
    if (Objects.nonNull(scheduledFuture)) {
        txLogger.taskInfo(groupId, unitId, "cancel {}:{} checking.", groupId, unitId);
        scheduledFuture.cancel(true); // 取消任务
    }
}

而在TM 端则进行如下处理


// 加入事务组
// RedisStorage
public void saveTransactionUnitToGroup(String groupId, TransactionUnit transactionUnit) throws FastStorageException {
    if (Optional.ofNullable(redisTemplate.hasKey(REDIS_GROUP_PREFIX + groupId)).orElse(false)) {
        redisTemplate.opsForHash().put(REDIS_GROUP_PREFIX + groupId, transactionUnit.getUnitId(), transactionUnit);
        return;
    }
    throw new FastStorageException("attempts to the non-existent transaction group " + groupId, FastStorageException.EX_CODE_NON_GROUP);
}

// TC 询问该事务组全局事务状态
public int transactionState(String groupId) {

    
    int state = exceptionService.transactionState(groupId); // 查询数据库t_tx_exception得到该事务的状态
    
    //存在数据时返回数据状态
    if (state != -1) {
        return state;
    }
    
    // 查询redis该事务组的全局状态
    return dtxContextRegistry.transactionState(groupId);
    
    // 为什么会先查数据库,再查redis ? 后文会有说明,主要是考虑事务补偿时的问题
}

// 返回的最终状态
public class AskTransactionStateExecuteService implements RpcExecuteService {

    @Override
    public Serializable execute(TransactionCmd transactionCmd) {
        int state = transactionManager.transactionState(transactionCmd.getGroupId());
        return state == -1 ? 0 : state;
    }
}

通知事务组

主事务的业务代码执行完毕,最终必须调用通知事务组进行全局事务协调。通知完成后进行事务清理


/**
 * Client通知事务组操作集合
 *
 * @param groupId         groupId
 * @param unitId          unitId
 * @param transactionType transactionType
 * @param state           transactionState
 */
public void notifyGroup(String groupId, String unitId, String transactionType, int state) {
    // ...
    // 事务通知
    reliableMessenger.notifyGroup(groupId, state);
    // 事务清理
    transactionCleanTemplate.clean(groupId, unitId, transactionType, state);
    // 通知异常(RPC调用异常)
    dtxExceptionHandler.handleNotifyGroupMessageException(Arrays.asList(groupId, state, unitId, transactionType), e);
    // ...
    
}

// 当TC调用TM抛出异常时,会正常的按照当前事务的状态进行提交,并将结果上报到TM

public void handleNotifyGroupMessageException(Object params, Throwable ex) {
    
    // 参数中取出事务的状态
    // ....

    // 按状态正常结束事务(切面补偿记录将保留)
    // TxManager 存在请求异常或者响应异常两种情况。当请求异常时这里的业务需要补偿,当响应异常的时候需要做状态的事务清理。
    // 请求异常时
    //     参与放会根据上报补偿记录做事务的提交。
    // 响应异常时
    //     参与反会正常提交事务,本地业务提示事务。

    // 该两种情况下补偿信息均可以忽略,可直接把本地补偿记录数据删除。


    String unitId = (String) paramList.get(2);
    String transactionType = (String) paramList.get(3);
    try {
        transactionCleanTemplate.compensationClean(groupId, unitId, transactionType, state);// 本地事务提交
    } catch (TransactionClearException e) {
        log.error("{} > compensationClean transaction error.", transactionType);
    }

    // 上报Manager,上报直到成功.
    txMangerReporter.reportTransactionState(groupId, null, TxExceptionParams.NOTIFY_GROUP_ERROR, state);
    // 提交的事务记录到t_tx_exception表中,所以会看到前文TC询问事务状态时,会优先查询数据库,而不是直接查redis
}


通知事务组的概念,应该理解为,主事务告知TM 进行全部的事务协调,即TM 仅会通知各个从事务进行commit or rollback,而不会通知主事务进行commit or rollback 。因为在前文看到创建事务组时,TM 并没有将主事务unitId 记录下来。而从事务加入事务组时,除了记录全局事务组Id,还包括事务单元unitId .


public Serializable execute(TransactionCmd transactionCmd) throws TxManagerException {
    try {
        // 从redis取事务状态
        int transactionState = transactionManager.transactionStateFromFastStorage(transactionCmd.getGroupId());
        boolean hasThrow = false;
        if (transactionState == 0) {
            commitState = 0;
            hasThrow = true;
        }
        // 事务状态为1进行全局事务提交
        if (commitState == 1) {
            transactionManager.commit(dtxContext);
        } else if (commitState == 0) {
            transactionManager.rollback(dtxContext);
        }
        // ...
    } catch (TransactionException e) {
        throw new TxManagerException(e);
    } finally {
        transactionManager.close(transactionCmd.getGroupId());
        // 系统日志
        txLogger.transactionInfo(transactionCmd.getGroupId(), "", "notify group over");
    }
    return null;
}

// 事务通知
private void notifyTransaction(DTXContext dtxContext, int transactionState) throws TransactionException {
    for (TransactionUnit transUnit : dtxContext.transactionUnits()) {
        NotifyUnitParams notifyUnitParams = new NotifyUnitParams();
        notifyUnitParams.setGroupId(dtxContext.getGroupId());
        notifyUnitParams.setUnitId(transUnit.getUnitId());
        notifyUnitParams.setUnitType(transUnit.getUnitType());
        notifyUnitParams.setState(transactionState);
        txLogger.info(dtxContext.getGroupId(),
                notifyUnitParams.getUnitId(), Transactions.TAG_TRANSACTION, "notify %s's unit: %s",
                transUnit.getModId(), transUnit.getUnitId());
        try {
            // 这里在5.0.1会出现信道问题,什么是信道问题?比如此时有两台push注册到TM上,而某一刻的全局事务所在的本地事务只在其中一台,而通知的时候如果modId一致则会取到第一个
            // 如下get(0) . 解决的办法是生成modId时去的是Mac地址+端口+服务名称,保证了不同实例的全局唯一
            List<String> modChannelKeys = rpcClient.remoteKeys(transUnit.getModId());
            if (modChannelKeys.isEmpty()) {
                // record exception
                throw new RpcException("offline mod.");
            }
            MessageDto respMsg =
                    rpcClient.request(modChannelKeys.get(0), MessageCreator.notifyUnit(notifyUnitParams));
            if (!MessageUtils.statusOk(respMsg)) {
                // 提交/回滚失败的消息处理
                List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
                rpcExceptionHandler.handleNotifyUnitBusinessException(params, respMsg.loadBean(Throwable.class));
            }
        } catch (RpcException e) {
            // 提交/回滚通讯失败
            List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
            rpcExceptionHandler.handleNotifyUnitMessageException(params, e);
        } finally {
            txLogger.transactionInfo(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify unit over");
        }
    }
}

// 当通知出现异常时,将信息记录到t_tx_exception表
public class ManagerRpcExceptionHandler implements RpcExceptionHandler {

    @Override
    public void handleNotifyUnitMessageException(Object params, Throwable e) {
        // notify unit message error, write txEx
        List paramList = ((List) params);
        String modName = (String) paramList.get(1);

        NotifyUnitParams notifyUnitParams = (NotifyUnitParams) paramList.get(0);
        WriteTxExceptionDTO writeTxExceptionReq = new WriteTxExceptionDTO(notifyUnitParams.getGroupId(),
                notifyUnitParams.getUnitId(), modName, notifyUnitParams.getState());
        writeTxExceptionReq.setRegistrar((short) 0);
        compensationService.writeTxException(writeTxExceptionReq);// 记录到t_tx_exception
        // 记住客户端主动查询时,优先查数据库,再查redis的事务状态
    }
}

总结

TX-LCN作为分布式解决方案是比较优秀的方案,代码逻辑也比较简单,但是如果应用Crash,就可能出现数据不一致的情况,而且这种数据不一致的情况必须人肉修复。

比如主事务在进行NotifyGroup 时出现RpcException 主事务会根据当前事务的状态进行commit or rollback ,之后会上报TM 记录补偿信息,假如在记录补偿时失败了(应用在这个点Crash)了,那么主事务提交了,并且TM 并不能完整地协调好从事务 的全局事务状态。

为什么需要人肉修复呢?其实从源码上可以分析出,TX-LCN解决的场景时将本地的事务通过事务协调器进行协调,但是本质上并没有将事务分布式节点化,即本地事务的成功与失败无法在不同的节点进行处理

相关文章

网友评论

      本文标题:TX-LCN 核心流程源码分析

      本文链接:https://www.haomeiwen.com/subject/nnimxctx.html