# EasyCore.DistributedTransaction.DTM **Repository Path**: gitmy/easy-core.-distributed-transaction.-dtm ## Basic Information - **Project Name**: EasyCore.DistributedTransaction.DTM - **Description**: EasyCore.DistributedTransaction.DTM - **Primary Language**: C# - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2026-01-12 - **Last Updated**: 2026-01-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # EasyCore.DistributedTransaction.DTM 一、分布式事务概述 在现代微服务架构下,一个完整的业务往往需要多个服务配合完成,如订单服务、库存服务、支付服务等。当这些服务分别持有自己的数据库或中间件资源时,原有的单机事务(如 MySQL 的事务)无法涵盖整个业务流程,此时就产生了分布式事务的问题。 什么是分布式事务? 分布式事务(Distributed Transaction)指一个事务操作涉及到多个数据库或服务,要求它们最终保持数据一致性。分布式事务的难点在于: 多服务、多数据源协调; 网络延迟或异常; 跨进程通信; 各参与服务的状态不一致。 分布式事务的核心目标是:保证在多个参与方之间实现一致性执行和失败回滚,确保最终状态一致或可补偿。 二、EasyCore.DistributedTransaction 分布式事务常见的三种模式 目前主流的分布式事务处理模型主要包括: TCC(Try-Confirm-Cancel)模式 Saga 模式 Msg(消息事务)模式 这些模式都属于“柔性事务”范畴,即放弃传统 ACID 强一致性,追求最终一致性,提高系统可用性和性能。 三、TCC 模式详解 1. 模式简介 TCC(Try-Confirm-Cancel)模式由阿里提出,是一种显式三阶段事务模型,通过接口来控制资源预留、确认提交和回滚。 它的核心思想是将业务逻辑分为三步: Try:资源预留(检查并锁定资源,但不真正变更)。 Confirm:确认提交(真正执行变更操作)。 Cancel:取消事务(释放资源或回滚预留)。 四、Saga 模式详解 1. 模式简介 Saga 模式由 Hector & Garcia-Molina 提出,它将一个长事务拆分成多个局部事务,每个事务有对应的补偿操作,在失败时依次执行补偿,实现事务回滚。 Saga 的事务流程是线性顺序的,一般包括: 正向动作(Action):执行主业务操作; 补偿动作(Compensating Action):若失败,则依次反向执行补偿操作。 Saga 可采用顺序执行或并发执行。 五、Msg 模式详解(事务消息模式) 1. 模式简介 Msg(事务消息)模式,又称可靠消息最终一致性模式,是基于消息队列实现的一种分布式事务模式。其核心思想是:本地事务与消息发送绑定在一起,保证只要本地业务提交,消息一定会发出。 2. 实现流程(以 DTM 为例) 调用 Prepare 接口(DTM 保存事务初始状态); 本地执行业务逻辑,记录“待发送消息”; 调用 Submit,DTM 开始向下游服务投递消息; 消费服务执行业务逻辑; 返回成功后,DTM 标记完成。 如果中途失败,DTM 将重试消息投递,直到成功或转为死信。 六、使用EasyCore.DistributedTransaction.DTM 1. 数据持久化 EasyCore.DistributedTransaction.DTM 支持mysql和sqlserver mysql:EasyCore.DistributedTransaction.DTM.MySql ``` builder.Services.EasyCoreDistributedTransactionDtm(options => { options.EasyCoreDtmMySql(options => { options.ConnectionString = "server=localhost;port=3307;user id=root;password=123;database=EasyCoreDtm"; options.DtmDataBase = "EasyCoreDtm"; }); options.DtmUrl = "http://localhost:36789"; }); ``` sqlserver:EasyCore.DistributedTransaction.DTM.SqlServer ``` builder.Services.EasyCoreDistributedTransactionDtm(options => { options.EasyCoreDtmSqlService(options => { options.ConnectionString = "Server=192.168.157.142;Database=EasyCoreDtm;User Id=sa;Password=Sa123456;TrustServerCertificate=True;Connect Timeout=10;"; options.DtmDataBase = "EasyCoreDtm"; }); options.DtmUrl = "http://localhost:36789"; }); ``` 注册时需要指向Dtm地址。 2.Msg模式 2.1 本地事务提交 ``` private readonly IEasyCoreDtmMsg _dtmMsg; public MsgController(IEasyCoreDtmMsg dtmMsg) => _dtmMsg = dtmMsg; [HttpPost("SubmitMsg")] public async Task PostSubmitMsg(CancellationToken cancellationToken) { var msgData = new EasyCoreDtmMsgData() { Action = "http://localhost:5039/api/Msg/PostMsgMesagger", Data = new PostDto { d = 1, Title = "test" } }; // do something return await _dtmMsg.MsgDoAndSubmitAsync("http://localhost:5039/api/Msg/msgqueryprepared", cancellationToken, msgData); //return await _dtmMsg.MsgEnableWaitResultAsync("http://localhost:5039/api/Msg/msgqueryprepared", cancellationToken, msgData); } ``` msgData 需要指向子事务地址,以及传递的参数。 MsgEnableWaitResultAsync 是用于启用“同步等待事务结果”机制。等待消息事务的最终执行结果,而不是异步提交后立刻返回。这样做的效果是:你可以知道整个事务是否真的全部成功,否则只知道提交成功,但实际子操作是否成功还不清楚。 MsgDoAndSubmitAsync 是立即返回的,不等子事务处理完。 2.2 Msg子事务 ``` [HttpPost("PostMsgMesagger")] public async Task PostMsgMesagger([FromBody] PostDto post) { // do something return await _dtmMsg.MsgSubTransactionAsync(Request.Query); } ``` 子事务执行失败时,间隔一段时间后会一直重试执行子事务。直到成功执行。 3. TCC模式 ``` public class TccController : ControllerBase { private readonly IEasyCoreDtmTcc _dtmTcc; public TccController(IEasyCoreDtmTcc dtmTcc) => _dtmTcc = dtmTcc; [HttpPost("SubmitTcc")] public async Task PostSubmitMsg(CancellationToken cancellationToken) { var tccData = new EasyCoreDtmTccData() { TryUrl = "http://localhost:5039/api/Tcc/TccTry", ConfirmUrl = "http://localhost:5039/api/Tcc/TccConfirm", CancelUrl = "http://localhost:5039/api/Tcc/TccCancel", Data = new PostDto { Id = 1, Title = "test" } }; // do something return await _dtmTcc.TccCallBranch(cancellationToken, tccData); } } ``` tccData 需要指向子事务的TryUrl,ConfirmUrl,CancelUrl 以及提交的数据。 3.1 Try尝试子事务 ``` [HttpPost("TccTry")] public async Task TccTry([FromBody] PostDto post) { // do something // throw new Exception(); // If an error occurs here, the TccCancel function will be called. return await _dtmTcc.TransSucceedResponse(); } ``` Try尝试子事务出现异常执行时,会调用Cancel回滚事务。 3.2 Confirm提交子事务 ``` [HttpPost("TccConfirm")] public async Task TccConfirm([FromBody] PostDto post) { // do something // throw new Exception(); // If an error occurs here, the TccConfirm function will be repeatedly called until it succeeds. return await _dtmTcc.TransSucceedResponse(); } ``` Confirm提交子事务出现异常时,间隔一段时间后会一直重试执行Confirm提交子事务。直到成功执行。 3.3 Cancel回滚事务 ``` [HttpPost("TccCancel")] public async Task TccCancel([FromBody] PostDto post) { // do something // If an error occurs here, the TccCancel function will be repeatedly called until it succeeds. return await _dtmTcc.TransSucceedResponse(); } ``` Cancel回滚事务出现异常时,间隔一段时间后会一直重试执行Cancel回滚事务。直到成功执行。 4. Saga模式 ``` public class SagaController : ControllerBase { private readonly IEasyCoreDtmSaga _dtmSaga; public SagaController(IEasyCoreDtmSaga dtmSaga) => _dtmSaga = dtmSaga; [HttpPost("SubmitMsg")] public async Task PostSubmitSaga(CancellationToken cancellationToken) { var sagaData = new EasyCoreDtmSagaData { Action = "http://localhost:5039/api/Saga/PostSagaMesagger", Compensate = "http://localhost:5039/api/Saga/CompensateMsg", Data = new PostDto { Id = 1, Title = "test" } }; return await _dtmSaga.SagaSubmitAsync(cancellationToken, sagaData); } } ``` sagaData 需指向子事务提交地址,子事务补偿地址以及传递的数据。 4.1 子事务提交 TransFailureResponse 在子事务中执行,会调用补偿事务。 ``` [HttpPost("PostSagaMesagger")] public async Task PostSagaMesagger([FromBody] PostDto post) { // do something // TransFailureResponse is executed in a subtransaction and will roll back the transaction. return await _dtmSaga.TransFailureResponse(); //return await _dtmSaga.TransSucceedResponse(); } ``` 4.2 子事务补偿 “TransFailureResponse”操作会在回滚事务中执行,并会不断重复调用该补偿事务,直至补偿事务成功执行完毕。 ``` [HttpPost("CompensateMsg")] public async Task PostSagaCompensateMsg([FromBody] PostDto post) { // do something // TransFailureResponse is executed in the rollback transaction and will call the rollback transaction again until the rollback transaction is successfully executed. //return await _dtmSaga.TransFailureResponse(); return await _dtmSaga.TransSucceedResponse(); } ``` !!!需要注意的是在Saga模式下,如果Saga子事务执行失败,必须显示调用await _dtmSaga.TransSucceedResponse()。!!!