# 分布式事务架构实践:Seata Saga与TCC模式的工程实现
在微服务架构成为主流的今天,分布式事务管理成为保证数据一致性的关键技术挑战。基于Seata框架的Saga模式和TCC模式,为不同业务场景提供了灵活的事务解决方案,特别是在金融级应用中对高可用和最终一致性的要求极为严格。
## Seata架构概览与部署
Seata提供AT、TCC、Saga和XA四种事务模式,支持多种部署方式,满足不同复杂度的业务需求。
```yaml
# seata-server-deployment.yaml - 高可用部署配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: seata-server
namespace: seata-system
labels:
app: seata-server
spec:
replicas: 3
selector:
matchLabels:
app: seata-server
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
template:
metadata:
labels:
app: seata-server
spec:
containers:
- name: seata-server
image: seataio/seata-server:1.6.0
ports:
- containerPort: 8091
name: http
- containerPort: 7091
name: rpc
env:
- name: SEATA_CONFIG_NAME
value: "file:/seata-server/resources/registry"
- name: STORE_MODE
value: "db"
- name: SERVER_NODE
valueFrom:
fieldRef:
fieldPath: metadata.name
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
readinessProbe:
httpGet:
path: /health
port: 8091
initialDelaySeconds: 30
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8091
initialDelaySeconds: 60
periodSeconds: 30
volumeMounts:
- name: config-volume
mountPath: /seata-server/resources
volumes:
- name: config-volume
configMap:
name: seata-server-config
---
apiVersion: v1
kind: ConfigMap
metadata:
name: seata-server-config
namespace: seata-system
data:
registry.conf: |
registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "nacos-headless.nacos.svc.cluster.local:8848"
namespace = "seata"
cluster = "default"
username = "nacos"
password = "nacos"
}
}
config {
type = "nacos"
nacos {
serverAddr = "nacos-headless.nacos.svc.cluster.local:8848"
namespace = "seata"
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}
```
## TCC模式核心实现
TCC(Try-Confirm-Cancel)模式通过业务逻辑层面的补偿机制实现最终一致性,适合高并发短事务场景。
```java
// TCC模式银行转账示例
package com.example.bank.transaction;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
@LocalTCC
public interface TransferTccService {
@TwoPhaseBusinessAction(
name = "transferTccService",
commitMethod = "commit",
rollbackMethod = "rollback"
)
boolean prepareTransfer(
BusinessActionContext actionContext,
@BusinessActionContextParameter(paramName = "fromAccount") String fromAccount,
@BusinessActionContextParameter(paramName = "toAccount") String toAccount,
@BusinessActionContextParameter(paramName = "amount") BigDecimal amount
);
boolean commit(BusinessActionContext actionContext);
boolean rollback(BusinessActionContext actionContext);
}
@Service
class TransferTccServiceImpl implements TransferTccService {
private final AccountRepository accountRepository;
private final FrozenBalanceRepository frozenRepository;
@Override
@Transactional
public boolean prepareTransfer(BusinessActionContext actionContext,
String fromAccount,
String toAccount,
BigDecimal amount) {
// 检查账户状态
Account account = accountRepository.findByAccountNo(fromAccount);
if (account == null) {
throw new RuntimeException("转出账户不存在");
}
if (account.getBalance().compareTo(amount) < 0) {
throw new RuntimeException("账户余额不足");
}
// 冻结转出账户资金
BigDecimal frozenBalance = account.getFrozenBalance() != null ?
account.getFrozenBalance() : BigDecimal.ZERO;
account.setFrozenBalance(frozenBalance.add(amount));
accountRepository.save(account);
// 记录冻结信息
FrozenBalance frozen = new FrozenBalance();
frozen.setXid(actionContext.getXid());
frozen.setAccountNo(fromAccount);
frozen.setAmount(amount);
frozen.setStatus(FrozenStatus.TRY);
frozen.setCreatedAt(new Date());
frozenRepository.save(frozen);
// 记录事务上下文
actionContext.addActionContext("frozenId", frozen.getId());
actionContext.addActionContext("fromAccount", fromAccount);
actionContext.addActionContext("toAccount", toAccount);
actionContext.addActionContext("amount", amount);
return true;
}
@Override
@Transactional
public boolean commit(BusinessActionContext actionContext) {
String xid = actionContext.getXid();
String fromAccount = (String) actionContext.getActionContext("fromAccount");
String toAccount = (String) actionContext.getActionContext("toAccount");
BigDecimal amount = (BigDecimal) actionContext.getActionContext("amount");
<"fbb.j9k5.org.cn"><"tbh.j9k5.org.cn"><"opj.j9k5.org.cn">
// 查询冻结记录
FrozenBalance frozen = frozenRepository.findByXidAndAccountNo(xid, fromAccount);
if (frozen == null || frozen.getStatus() != FrozenStatus.TRY) {
throw new RuntimeException("冻结记录不存在或状态异常");
}
// 执行实际转账
Account from = accountRepository.findByAccountNo(fromAccount);
Account to = accountRepository.findByAccountNo(toAccount);
// 扣减转出账户余额和冻结金额
from.setBalance(from.getBalance().subtract(amount));
from.setFrozenBalance(from.getFrozenBalance().subtract(amount));
accountRepository.save(from);
// 增加转入账户余额
to.setBalance(to.getBalance().add(amount));
accountRepository.save(to);
// 更新冻结记录状态
frozen.setStatus(FrozenStatus.CONFIRM);
frozen.setConfirmedAt(new Date());
frozenRepository.save(frozen);
// 记录转账流水
TransferRecord record = new TransferRecord();
record.setXid(xid);
record.setFromAccount(fromAccount);
record.setToAccount(toAccount);
record.setAmount(amount);
record.setStatus(TransferStatus.SUCCESS);
record.setCreatedAt(new Date());
transferRecordRepository.save(record);
return true;
}
@Override
@Transactional
public boolean rollback(BusinessActionContext actionContext) {
String xid = actionContext.getXid();
String fromAccount = (String) actionContext.getActionContext("fromAccount");
// 查询冻结记录
FrozenBalance frozen = frozenRepository.findByXidAndAccountNo(xid, fromAccount);
if (frozen == null) {
return true; // 无冻结记录,无需回滚
}
if (frozen.getStatus() == FrozenStatus.CANCEL) {
return true; // 已回滚过
}
// 解冻资金
Account account = accountRepository.findByAccountNo(fromAccount);
if (account != null) {
account.setFrozenBalance(account.getFrozenBalance().subtract(frozen.getAmount()));
accountRepository.save(account);
}
// 更新冻结记录状态
frozen.setStatus(FrozenStatus.CANCEL);
frozen.setCanceledAt(new Date());
frozenRepository.save(frozen);
// 记录回滚日志
CompensateLog log = new CompensateLog();
log.setXid(xid);
log.setServiceName("transferTccService");
log.setAction("rollback");
log.setStatus(CompensateStatus.SUCCESS);
log.setCreatedAt(new Date());
compensateLogRepository.save(log);
return true;
}
}
```
## Saga模式编排引擎
Saga模式适用于长事务场景,通过状态机编排服务调用,支持正向服务和补偿服务定义。
```java
// Saga状态机编排示例
package com.example.order.saga;
import io.seata.saga.engine.StateMachineEngine;
import io.seata.saga.statelang.domain.StateMachineInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class OrderSagaService {
@Autowired
private StateMachineEngine stateMachineEngine;
public StateMachineInstance createOrder(String userId, List
Map
startParams.put("userId", userId);
startParams.put("orderItems", items);
startParams.put("totalAmount", calculateTotalAmount(items));
// 启动Saga状态机
StateMachineInstance instance = stateMachineEngine.start(
"orderCreationSaga",
null,
startParams
);
return instance;
}
}
// Saga状态机定义 JSON格式
{
"Name": "orderCreationSaga",
"Version": "1.0.0",
"Comment": "订单创建Saga流程",
"StartState": "ValidateOrder",
"States": {
"ValidateOrder": {
"Type": "ServiceTask",
"ServiceName": "orderService",
"ServiceMethod": "validateOrder",
"CompensateState": "CompensateValidate",
"Next": "ReserveInventory",
"Input": [
"${userId}",
"${orderItems}"
],
"Output": {
"validationResult": "$.validationResult",
"orderId": "#uuid"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA"
}
},
"ReserveInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryService",
"ServiceMethod": "reserveInventory",
"CompensateState": "CompensateInventory",
"Next": "ProcessPayment",
"Input": [
"${orderId}",
"${orderItems}"
],
"Output": {
"inventoryReserved": "$.success"
},
"Catch": [
{
"Exceptions": ["java.lang.Exception"],
"Next": "CompensationTrigger"
}
]
},
"ProcessPayment": {
"Type": "ServiceTask",
"ServiceName": "paymentService",
"ServiceMethod": "processPayment",
"CompensateState": "CompensatePayment",
"Next": "ConfirmOrder",
"Input": [
"${orderId}",
"${userId}",
"${totalAmount}"
],
"Output": {
"paymentId": "$.paymentId",
"paymentStatus": "$.status"
},
"Retry": [
{
"Exceptions": ["PaymentException"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
]
},
"ConfirmOrder": {
"Type": "ServiceTask",
"ServiceName": "orderService",
"ServiceMethod": "confirmOrder",
"Next": "SuccessEnd",
"Input": [
"${orderId}"
]
},
"CompensateValidate": {
"Type": "ServiceTask",
"ServiceName": "orderService",
"ServiceMethod": "compensateValidate",
"Next": "FailEnd"
},
"CompensateInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryService",
"ServiceMethod": "releaseInventory",
"Next": "CompensateValidate"
},
"CompensatePayment": {
"Type": "ServiceTask",
"ServiceName": "paymentService",
"ServiceMethod": "refundPayment",
"Next": "CompensateInventory"
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "FailEnd"
},
"SuccessEnd": {
"Type": "Succeed"
},
"FailEnd": {
"Type": "Fail",
"ErrorCode": "ORDER_CREATE_FAILED",
"Message": "订单创建失败"
}
}
}
```
## 混合模式应用实践
在实际业务中,往往需要结合TCC和Saga模式的优点,形成混合事务解决方案。
```java
// 混合事务模式实现
package com.example.ecommerce.transaction;
<"efq.j9k5.org.cn"><"bft.j9k5.org.cn"><"ytm.j9k5.org.cn">
@Service
public class HybridTransactionService {
private final TccTransactionService tccService;
private final SagaTransactionService sagaService;
private final TransactionCoordinator coordinator;
public TransactionResult placeOrder(OrderRequest request) {
// 使用TCC处理库存预留(短事务,需要强一致性)
TccResult inventoryResult = tccService.reserveInventory(
request.getItems(),
request.getOrderId()
);
if (!inventoryResult.isSuccess()) {
return TransactionResult.fail("库存预留失败");
}
try {
// 使用Saga处理后续长流程
SagaInstance sagaInstance = sagaService.startOrderSaga(request);
// 等待Saga完成或超时
return waitForSagaCompletion(sagaInstance);
} catch (Exception e) {
// Saga失败,回滚TCC预留
tccService.cancelReservation(inventoryResult.getXid());
throw e;
}
}
private TransactionResult waitForSagaCompletion(SagaInstance instance) {
long startTime = System.currentTimeMillis();
long timeout = 30000; // 30秒超时
while (System.currentTimeMillis() - startTime < timeout) {
SagaStatus status = coordinator.getSagaStatus(instance.getId());
switch (status) {
case SUCCEEDED:
return TransactionResult.success(instance.getOutput());
case FAILED:
return TransactionResult.fail(instance.getErrorMessage());
case COMPENSATED:
return TransactionResult.fail("事务已回滚");
default:
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return TransactionResult.fail("等待中断");
}
}
}
return TransactionResult.fail("事务处理超时");
}
}
// 事务协调器实现
@Component
public class TransactionCoordinator {
private final Map
new ConcurrentHashMap<>();
public String beginTransaction(String businessType) {
String xid = UUID.randomUUID().toString();
TransactionContext context = new TransactionContext(xid, businessType);
contextMap.put(xid, context);
// 记录事务开始
transactionLogRepository.save(
TransactionLog.begin(xid, businessType)
);
return xid;
}
public void registerBranch(String xid, String branchId,
BranchType branchType) {
TransactionContext context = contextMap.get(xid);
if (context != null) {
context.addBranch(branchId, branchType);
// 记录分支注册
branchLogRepository.save(
BranchLog.register(xid, branchId, branchType)
);
}
}
public void commit(String xid) {
TransactionContext context = contextMap.get(xid);
if (context == null) {
throw new RuntimeException("事务上下文不存在");
}
// 执行两阶段提交
try {
// 第一阶段:准备
for (BranchInfo branch : context.getBranches()) {
if (branch.getType() == BranchType.TCC) {
tccCoordinator.prepare(branch.getId());
}
}
// 第二阶段:提交
for (BranchInfo branch : context.getBranches()) {
if (branch.getType() == BranchType.TCC) {
tccCoordinator.commit(branch.getId());
} else if (branch.getType() == BranchType.SAGA) {
sagaCoordinator.confirm(branch.getId());
}
}
// 记录事务完成
transactionLogRepository.save(
TransactionLog.commit(xid)
);
contextMap.remove(xid);
} catch (Exception e) {
// 失败处理
rollback(xid);
throw e;
}
}
public void rollback(String xid) {
TransactionContext context = contextMap.get(xid);
if (context == null) {
throw new RuntimeException("事务上下文不存在");
}
// 按注册逆序执行回滚
List
Collections.reverse(branches);
for (BranchInfo branch : branches) {
try {
if (branch.getType() == BranchType.TCC) {
tccCoordinator.rollback(branch.getId());
} else if (branch.getType() == BranchType.SAGA) {
sagaCoordinator.compensate(branch.getId());
}
} catch (Exception e) {
// 记录回滚失败,但不中断其他分支回滚
errorLogRepository.save(
ErrorLog.rollbackFailed(xid, branch.getId(), e.getMessage())
);
}
}
// 记录事务回滚
transactionLogRepository.save(
TransactionLog.rollback(xid)
);
contextMap.remove(xid);
}
}
```
## 监控与运维体系
```yaml
# seata-dashboard.yaml - 监控面板部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: seata-dashboard
namespace: seata-system
spec:
replicas: 1
selector:
matchLabels:
app: seata-dashboard
template:
metadata:
labels:
app: seata-dashboard
spec:
containers:
- name: seata-dashboard
image: seataio/seata-dashboard:1.6.0
ports:
- containerPort: 7099
- containerPort: 9090
env:
- name: SEATA_CONFIG_NAME
value: "file:/seata-dashboard/resources/application.yml"
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
volumeMounts:
- name: config-volume
mountPath: /seata-dashboard/resources
volumes:
- name: config-volume
configMap:
name: seata-dashboard-config
---
apiVersion: v1
kind: ConfigMap
metadata:
name: seata-dashboard-config
namespace: seata-system
data:
application.yml: |
server:
port: 7099
spring:
application:
name: seata-dashboard
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://mysql.seata-system:3306/seata?useUnicode=true&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false
username: seata
password: seata123
seata:
dashboard:
# 事务监控配置
enabled: true
auth:
enabled: true
username: admin
password: seata123
# 监控数据保留策略
retention:
transaction: 30
branch: 30
lock: 30
# 告警配置
alert:
enabled: true
webhook: http://alert-manager.monitoring.svc.cluster.local:9093/api/v1/alerts
rules:
- name: "事务失败率告警"
expr: "increase(seata_transaction_failed_total[5m]) / increase(seata_transaction_total[5m]) * 100 > 5"
duration: "1m"
labels:
severity: warning
annotations:
summary: "事务失败率超过5%"
- name: "事务处理延迟告警"
expr: "histogram_quantile(0.95, rate(seata_transaction_duration_seconds_bucket[5m])) > 10"
duration: "2m"
labels:
severity: critical
annotations:
summary: "95%事务处理延迟超过10秒"
```
基于Seata的Saga模式和TCC模式为分布式事务提供了完整的解决方案。TCC模式适合短事务和高并发场景,通过业务层面的补偿保证一致性;Saga模式适合长事务和复杂业务流程,通过状态机编排实现最终一致性。在实际工程实践中,应根据业务特性选择合适的模式或组合使用,同时建立完善的监控告警体系,确保分布式事务的可靠执行。随着业务规模的扩大,还需要考虑性能优化、容灾备份等高级特性,构建健壮的分布式事务基础设施。