分布式事务架构实践:Seata Saga与TCC模式的工程实现

# 分布式事务架构实践:Seata Saga与TCC模式的工程实现


在微服务架构成为主流的今天,分布式事务管理成为保证数据一致性的关键技术挑战。基于Seata框架的Saga模式和TCC模式,为不同业务场景提供了灵活的事务解决方案,特别是在金融级应用中对高可用和最终一致性的要求极为严格。


## Seata架构概览与部署


Seata提供AT、TCC、Saga和XA四种事务模式,支持多种部署方式,满足不同复杂度的业务需求。


```yaml

# seata-server-deployment.yaml - 高可用部署配置

apiVersion: apps/v1

kind: Deployment

metadata:

  name: seata-server

  namespace: seata-system

  labels:

    app: seata-server

spec:

  replicas: 3

  selector:

    matchLabels:

      app: seata-server

  strategy:

    type: RollingUpdate

    rollingUpdate:

      maxSurge: 1

      maxUnavailable: 0

  template:

    metadata:

      labels:

        app: seata-server

    spec:

      containers:

      - name: seata-server

        image: seataio/seata-server:1.6.0

        ports:

        - containerPort: 8091

          name: http

        - containerPort: 7091

          name: rpc

        env:

        - name: SEATA_CONFIG_NAME

          value: "file:/seata-server/resources/registry"

        - name: STORE_MODE

          value: "db"

        - name: SERVER_NODE

          valueFrom:

            fieldRef:

              fieldPath: metadata.name

        resources:

          requests:

            memory: "1Gi"

            cpu: "500m"

          limits:

            memory: "2Gi"

            cpu: "1000m"

        readinessProbe:

          httpGet:

            path: /health

            port: 8091

          initialDelaySeconds: 30

          periodSeconds: 10

        livenessProbe:

          httpGet:

            path: /health

            port: 8091

          initialDelaySeconds: 60

          periodSeconds: 30

        volumeMounts:

        - name: config-volume

          mountPath: /seata-server/resources

      volumes:

      - name: config-volume

        configMap:

          name: seata-server-config

---

apiVersion: v1

kind: ConfigMap

metadata:

  name: seata-server-config

  namespace: seata-system

data:

  registry.conf: |

    registry {

      type = "nacos"

      nacos {

        application = "seata-server"

        serverAddr = "nacos-headless.nacos.svc.cluster.local:8848"

        namespace = "seata"

        cluster = "default"

        username = "nacos"

        password = "nacos"

      }

    }

    

    config {

      type = "nacos"

      nacos {

        serverAddr = "nacos-headless.nacos.svc.cluster.local:8848"

        namespace = "seata"

        group = "SEATA_GROUP"

        username = "nacos"

        password = "nacos"

        dataId = "seataServer.properties"

      }

    }

```


## TCC模式核心实现


TCC(Try-Confirm-Cancel)模式通过业务逻辑层面的补偿机制实现最终一致性,适合高并发短事务场景。


```java

// TCC模式银行转账示例

package com.example.bank.transaction;


import io.seata.rm.tcc.api.BusinessActionContext;

import io.seata.rm.tcc.api.BusinessActionContextParameter;

import io.seata.rm.tcc.api.LocalTCC;

import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import org.springframework.stereotype.Service;

import org.springframework.transaction.annotation.Transactional;


import java.math.BigDecimal;


@LocalTCC

public interface TransferTccService {

    

    @TwoPhaseBusinessAction(

        name = "transferTccService", 

        commitMethod = "commit", 

        rollbackMethod = "rollback"

    )

    boolean prepareTransfer(

        BusinessActionContext actionContext,

        @BusinessActionContextParameter(paramName = "fromAccount") String fromAccount,

        @BusinessActionContextParameter(paramName = "toAccount") String toAccount,

        @BusinessActionContextParameter(paramName = "amount") BigDecimal amount

    );

    

    boolean commit(BusinessActionContext actionContext);

    

    boolean rollback(BusinessActionContext actionContext);

}


@Service

class TransferTccServiceImpl implements TransferTccService {

    

    private final AccountRepository accountRepository;

    private final FrozenBalanceRepository frozenRepository;

    

    @Override

    @Transactional

    public boolean prepareTransfer(BusinessActionContext actionContext, 

                                   String fromAccount, 

                                   String toAccount, 

                                   BigDecimal amount) {

        

        // 检查账户状态

        Account account = accountRepository.findByAccountNo(fromAccount);

        if (account == null) {

            throw new RuntimeException("转出账户不存在");

        }

        

        if (account.getBalance().compareTo(amount) < 0) {

            throw new RuntimeException("账户余额不足");

        }

        

        // 冻结转出账户资金

        BigDecimal frozenBalance = account.getFrozenBalance() != null ? 

                                 account.getFrozenBalance() : BigDecimal.ZERO;

        account.setFrozenBalance(frozenBalance.add(amount));

        accountRepository.save(account);

        

        // 记录冻结信息

        FrozenBalance frozen = new FrozenBalance();

        frozen.setXid(actionContext.getXid());

        frozen.setAccountNo(fromAccount);

        frozen.setAmount(amount);

        frozen.setStatus(FrozenStatus.TRY);

        frozen.setCreatedAt(new Date());

        frozenRepository.save(frozen);

        

        // 记录事务上下文

        actionContext.addActionContext("frozenId", frozen.getId());

        actionContext.addActionContext("fromAccount", fromAccount);

        actionContext.addActionContext("toAccount", toAccount);

        actionContext.addActionContext("amount", amount);

        

        return true;

    }

    

    @Override

    @Transactional

    public boolean commit(BusinessActionContext actionContext) {

        String xid = actionContext.getXid();

        String fromAccount = (String) actionContext.getActionContext("fromAccount");

        String toAccount = (String) actionContext.getActionContext("toAccount");

        BigDecimal amount = (BigDecimal) actionContext.getActionContext("amount");

        <"fbb.j9k5.org.cn"><"tbh.j9k5.org.cn"><"opj.j9k5.org.cn">

        // 查询冻结记录

        FrozenBalance frozen = frozenRepository.findByXidAndAccountNo(xid, fromAccount);

        if (frozen == null || frozen.getStatus() != FrozenStatus.TRY) {

            throw new RuntimeException("冻结记录不存在或状态异常");

        }

        

        // 执行实际转账

        Account from = accountRepository.findByAccountNo(fromAccount);

        Account to = accountRepository.findByAccountNo(toAccount);

        

        // 扣减转出账户余额和冻结金额

        from.setBalance(from.getBalance().subtract(amount));

        from.setFrozenBalance(from.getFrozenBalance().subtract(amount));

        accountRepository.save(from);

        

        // 增加转入账户余额

        to.setBalance(to.getBalance().add(amount));

        accountRepository.save(to);

        

        // 更新冻结记录状态

        frozen.setStatus(FrozenStatus.CONFIRM);

        frozen.setConfirmedAt(new Date());

        frozenRepository.save(frozen);

        

        // 记录转账流水

        TransferRecord record = new TransferRecord();

        record.setXid(xid);

        record.setFromAccount(fromAccount);

        record.setToAccount(toAccount);

        record.setAmount(amount);

        record.setStatus(TransferStatus.SUCCESS);

        record.setCreatedAt(new Date());

        transferRecordRepository.save(record);

        

        return true;

    }

    

    @Override

    @Transactional

    public boolean rollback(BusinessActionContext actionContext) {

        String xid = actionContext.getXid();

        String fromAccount = (String) actionContext.getActionContext("fromAccount");

        

        // 查询冻结记录

        FrozenBalance frozen = frozenRepository.findByXidAndAccountNo(xid, fromAccount);

        if (frozen == null) {

            return true; // 无冻结记录,无需回滚

        }

        

        if (frozen.getStatus() == FrozenStatus.CANCEL) {

            return true; // 已回滚过

        }

        

        // 解冻资金

        Account account = accountRepository.findByAccountNo(fromAccount);

        if (account != null) {

            account.setFrozenBalance(account.getFrozenBalance().subtract(frozen.getAmount()));

            accountRepository.save(account);

        }

        

        // 更新冻结记录状态

        frozen.setStatus(FrozenStatus.CANCEL);

        frozen.setCanceledAt(new Date());

        frozenRepository.save(frozen);

        

        // 记录回滚日志

        CompensateLog log = new CompensateLog();

        log.setXid(xid);

        log.setServiceName("transferTccService");

        log.setAction("rollback");

        log.setStatus(CompensateStatus.SUCCESS);

        log.setCreatedAt(new Date());

        compensateLogRepository.save(log);

        

        return true;

    }

}

```


## Saga模式编排引擎


Saga模式适用于长事务场景,通过状态机编排服务调用,支持正向服务和补偿服务定义。


```java

// Saga状态机编排示例

package com.example.order.saga;


import io.seata.saga.engine.StateMachineEngine;

import io.seata.saga.statelang.domain.StateMachineInstance;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;


import java.util.HashMap;

import java.util.Map;


@Service

public class OrderSagaService {

    

    @Autowired

    private StateMachineEngine stateMachineEngine;

    

    public StateMachineInstance createOrder(String userId, List items) {

        Map startParams = new HashMap<>();

        startParams.put("userId", userId);

        startParams.put("orderItems", items);

        startParams.put("totalAmount", calculateTotalAmount(items));

        

        // 启动Saga状态机

        StateMachineInstance instance = stateMachineEngine.start(

            "orderCreationSaga", 

            null, 

            startParams

        );

        

        return instance;

    }

}


// Saga状态机定义 JSON格式

{

  "Name": "orderCreationSaga",

  "Version": "1.0.0",

  "Comment": "订单创建Saga流程",

  "StartState": "ValidateOrder",

  "States": {

    "ValidateOrder": {

      "Type": "ServiceTask",

      "ServiceName": "orderService",

      "ServiceMethod": "validateOrder",

      "CompensateState": "CompensateValidate",

      "Next": "ReserveInventory",

      "Input": [

        "${userId}",

        "${orderItems}"

      ],

      "Output": {

        "validationResult": "$.validationResult",

        "orderId": "#uuid"

      },

      "Status": {

        "#root == true": "SU",

        "#root == false": "FA"

      }

    },

    

    "ReserveInventory": {

      "Type": "ServiceTask",

      "ServiceName": "inventoryService",

      "ServiceMethod": "reserveInventory",

      "CompensateState": "CompensateInventory",

      "Next": "ProcessPayment",

      "Input": [

        "${orderId}",

        "${orderItems}"

      ],

      "Output": {

        "inventoryReserved": "$.success"

      },

      "Catch": [

        {

          "Exceptions": ["java.lang.Exception"],

          "Next": "CompensationTrigger"

        }

      ]

    },

    

    "ProcessPayment": {

      "Type": "ServiceTask",

      "ServiceName": "paymentService",

      "ServiceMethod": "processPayment",

      "CompensateState": "CompensatePayment",

      "Next": "ConfirmOrder",

      "Input": [

        "${orderId}",

        "${userId}",

        "${totalAmount}"

      ],

      "Output": {

        "paymentId": "$.paymentId",

        "paymentStatus": "$.status"

      },

      "Retry": [

        {

          "Exceptions": ["PaymentException"],

          "IntervalSeconds": 1,

          "MaxAttempts": 3,

          "BackoffRate": 2.0

        }

      ]

    },

    

    "ConfirmOrder": {

      "Type": "ServiceTask",

      "ServiceName": "orderService",

      "ServiceMethod": "confirmOrder",

      "Next": "SuccessEnd",

      "Input": [

        "${orderId}"

      ]

    },

    

    "CompensateValidate": {

      "Type": "ServiceTask",

      "ServiceName": "orderService",

      "ServiceMethod": "compensateValidate",

      "Next": "FailEnd"

    },

    

    "CompensateInventory": {

      "Type": "ServiceTask",

      "ServiceName": "inventoryService",

      "ServiceMethod": "releaseInventory",

      "Next": "CompensateValidate"

    },

    

    "CompensatePayment": {

      "Type": "ServiceTask",

      "ServiceName": "paymentService",

      "ServiceMethod": "refundPayment",

      "Next": "CompensateInventory"

    },

    

    "CompensationTrigger": {

      "Type": "CompensationTrigger",

      "Next": "FailEnd"

    },

    

    "SuccessEnd": {

      "Type": "Succeed"

    },

    

    "FailEnd": {

      "Type": "Fail",

      "ErrorCode": "ORDER_CREATE_FAILED",

      "Message": "订单创建失败"

    }

  }

}

```


## 混合模式应用实践


在实际业务中,往往需要结合TCC和Saga模式的优点,形成混合事务解决方案。


```java

// 混合事务模式实现

package com.example.ecommerce.transaction;

<"efq.j9k5.org.cn"><"bft.j9k5.org.cn"><"ytm.j9k5.org.cn">

@Service

public class HybridTransactionService {

    

    private final TccTransactionService tccService;

    private final SagaTransactionService sagaService;

    private final TransactionCoordinator coordinator;

    

    public TransactionResult placeOrder(OrderRequest request) {

        // 使用TCC处理库存预留(短事务,需要强一致性)

        TccResult inventoryResult = tccService.reserveInventory(

            request.getItems(), 

            request.getOrderId()

        );

        

        if (!inventoryResult.isSuccess()) {

            return TransactionResult.fail("库存预留失败");

        }

        

        try {

            // 使用Saga处理后续长流程

            SagaInstance sagaInstance = sagaService.startOrderSaga(request);

            

            // 等待Saga完成或超时

            return waitForSagaCompletion(sagaInstance);

            

        } catch (Exception e) {

            // Saga失败,回滚TCC预留

            tccService.cancelReservation(inventoryResult.getXid());

            throw e;

        }

    }

    

    private TransactionResult waitForSagaCompletion(SagaInstance instance) {

        long startTime = System.currentTimeMillis();

        long timeout = 30000; // 30秒超时

        

        while (System.currentTimeMillis() - startTime < timeout) {

            SagaStatus status = coordinator.getSagaStatus(instance.getId());

            

            switch (status) {

                case SUCCEEDED:

                    return TransactionResult.success(instance.getOutput());

                case FAILED:

                    return TransactionResult.fail(instance.getErrorMessage());

                case COMPENSATED:

                    return TransactionResult.fail("事务已回滚");

                default:

                    try {

                        Thread.sleep(500);

                    } catch (InterruptedException e) {

                        Thread.currentThread().interrupt();

                        return TransactionResult.fail("等待中断");

                    }

            }

        }

        

        return TransactionResult.fail("事务处理超时");

    }

}


// 事务协调器实现

@Component

public class TransactionCoordinator {

    

    private final Map contextMap = 

        new ConcurrentHashMap<>();

    

    public String beginTransaction(String businessType) {

        String xid = UUID.randomUUID().toString();

        TransactionContext context = new TransactionContext(xid, businessType);

        contextMap.put(xid, context);

        

        // 记录事务开始

        transactionLogRepository.save(

            TransactionLog.begin(xid, businessType)

        );

        

        return xid;

    }

    

    public void registerBranch(String xid, String branchId, 

                              BranchType branchType) {

        TransactionContext context = contextMap.get(xid);

        if (context != null) {

            context.addBranch(branchId, branchType);

            

            // 记录分支注册

            branchLogRepository.save(

                BranchLog.register(xid, branchId, branchType)

            );

        }

    }

    

    public void commit(String xid) {

        TransactionContext context = contextMap.get(xid);

        if (context == null) {

            throw new RuntimeException("事务上下文不存在");

        }

        

        // 执行两阶段提交

        try {

            // 第一阶段:准备

            for (BranchInfo branch : context.getBranches()) {

                if (branch.getType() == BranchType.TCC) {

                    tccCoordinator.prepare(branch.getId());

                }

            }

            

            // 第二阶段:提交

            for (BranchInfo branch : context.getBranches()) {

                if (branch.getType() == BranchType.TCC) {

                    tccCoordinator.commit(branch.getId());

                } else if (branch.getType() == BranchType.SAGA) {

                    sagaCoordinator.confirm(branch.getId());

                }

            }

            

            // 记录事务完成

            transactionLogRepository.save(

                TransactionLog.commit(xid)

            );

            

            contextMap.remove(xid);

            

        } catch (Exception e) {

            // 失败处理

            rollback(xid);

            throw e;

        }

    }

    

    public void rollback(String xid) {

        TransactionContext context = contextMap.get(xid);

        if (context == null) {

            throw new RuntimeException("事务上下文不存在");

        }

        

        // 按注册逆序执行回滚

        List branches = new ArrayList<>(context.getBranches());

        Collections.reverse(branches);

        

        for (BranchInfo branch : branches) {

            try {

                if (branch.getType() == BranchType.TCC) {

                    tccCoordinator.rollback(branch.getId());

                } else if (branch.getType() == BranchType.SAGA) {

                    sagaCoordinator.compensate(branch.getId());

                }

            } catch (Exception e) {

                // 记录回滚失败,但不中断其他分支回滚

                errorLogRepository.save(

                    ErrorLog.rollbackFailed(xid, branch.getId(), e.getMessage())

                );

            }

        }

        

        // 记录事务回滚

        transactionLogRepository.save(

            TransactionLog.rollback(xid)

        );

        

        contextMap.remove(xid);

    }

}

```


## 监控与运维体系


```yaml

# seata-dashboard.yaml - 监控面板部署

apiVersion: apps/v1

kind: Deployment

metadata:

  name: seata-dashboard

  namespace: seata-system

spec:

  replicas: 1

  selector:

    matchLabels:

      app: seata-dashboard

  template:

    metadata:

      labels:

        app: seata-dashboard

    spec:

      containers:

      - name: seata-dashboard

        image: seataio/seata-dashboard:1.6.0

        ports:

        - containerPort: 7099

        - containerPort: 9090

        env:

        - name: SEATA_CONFIG_NAME

          value: "file:/seata-dashboard/resources/application.yml"

        resources:

          requests:

            memory: "512Mi"

            cpu: "250m"

          limits:

            memory: "1Gi"

            cpu: "500m"

        volumeMounts:

        - name: config-volume

          mountPath: /seata-dashboard/resources

      volumes:

      - name: config-volume

        configMap:

          name: seata-dashboard-config

---

apiVersion: v1

kind: ConfigMap

metadata:

  name: seata-dashboard-config

  namespace: seata-system

data:

  application.yml: |

    server:

      port: 7099

    

    spring:

      application:

        name: seata-dashboard

      

      datasource:

        driver-class-name: com.mysql.cj.jdbc.Driver

        url: jdbc:mysql://mysql.seata-system:3306/seata?useUnicode=true&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false

        username: seata

        password: seata123

    

    seata:

      dashboard:

        # 事务监控配置

        enabled: true

        auth:

          enabled: true

          username: admin

          password: seata123

        

        # 监控数据保留策略

        retention:

          transaction: 30

          branch: 30

          lock: 30

        

        # 告警配置

        alert:

          enabled: true

          webhook: http://alert-manager.monitoring.svc.cluster.local:9093/api/v1/alerts

          rules:

            - name: "事务失败率告警"

              expr: "increase(seata_transaction_failed_total[5m]) / increase(seata_transaction_total[5m]) * 100 > 5"

              duration: "1m"

              labels:

                severity: warning

              annotations:

                summary: "事务失败率超过5%"

            

            - name: "事务处理延迟告警"

              expr: "histogram_quantile(0.95, rate(seata_transaction_duration_seconds_bucket[5m])) > 10"

              duration: "2m"

              labels:

                severity: critical

              annotations:

                summary: "95%事务处理延迟超过10秒"

```


基于Seata的Saga模式和TCC模式为分布式事务提供了完整的解决方案。TCC模式适合短事务和高并发场景,通过业务层面的补偿保证一致性;Saga模式适合长事务和复杂业务流程,通过状态机编排实现最终一致性。在实际工程实践中,应根据业务特性选择合适的模式或组合使用,同时建立完善的监控告警体系,确保分布式事务的可靠执行。随着业务规模的扩大,还需要考虑性能优化、容灾备份等高级特性,构建健壮的分布式事务基础设施。


请使用浏览器的分享功能分享到微信等