分库分表后的分布式事务从 Seata AT 到本地消息表的架构抉择一、跨库转账的 1.7 亿异常订单某支付系统从单库拆分为用户库、账户库、订单库后跨库转账出现经典问题账户扣款成功但订单状态未更新产生 1.7 亿笔扣了钱但没订单的异常数据。根本原因拆库后原本地事务变为跨库操作应用层用先扣款再创单的伪事务实现第二步失败后无回滚机制。分布式事务不是理论问题是拆库后的必然产物。本文从 Seata AT 模式、TCC 模式、本地消息表三种方案出发量化分析各自的性能开销与一致性保证给出不同业务场景的架构抉择依据。二、分布式事务的三种核心模型2.1 Seata AT 模式自动补偿的隐含代价Seata AT 模式通过拦截 SQL 自动生成回滚日志Undo Log在事务失败时自动执行反向 SQL 补偿。sequenceDiagram participant TM as 事务管理器 participant RM1 as 账户库 RM participant RM2 as 订单库 RM participant TC as Seata Server TM-TC: 开启全局事务 (XID) TC--TM: 返回 XID TM-RM1: 扣款 (XID 传播) RM1-RM1: 执行 SQL 记录 Undo Log RM1-TC: 注册分支事务 TC--RM1: 确认 TM-RM2: 创建订单 (XID 传播) RM2-RM2: 执行 SQL 记录 Undo Log RM2-TC: 注册分支事务 TC--RM2: 确认 alt 全部成功 TM-TC: 全局提交 TC-RM1: 提交分支 (异步删除 Undo Log) TC-RM2: 提交分支 else 任一失败 TM-TC: 全局回滚 TC-RM1: 回滚分支 (执行 Undo Log 反向 SQL) TC-RM2: 回滚分支 endAT 模式的隐含代价全局锁分支事务提交前持有全局锁阻止其他事务修改同一行导致并发度下降Undo Log 存储每条 SQL 生成前后镜像存储开销约为业务数据的 2 倍TC 单点Seata Server 是全局事务协调者故障则所有事务阻塞2.2 TCC 模式业务级补偿的精确控制TCCTry-Confirm-Cancel将每个操作拆为三个阶段Try预留资源冻结金额Confirm确认操作扣减冻结金额Cancel取消操作释放冻结金额TCC 无全局锁Try 阶段只预留不提交并发度高于 AT。但每个操作需要编写三套代码开发成本高。2.3 本地消息表最终一致性的工程化方案本地消息表将分布式事务拆解为本地事务 消息投递 幂等消费。flowchart LR A[业务操作 写入消息表] -- B[本地事务提交] B -- C[后台线程扫描消息表] C -- D[发送消息到 MQ] D -- E[消费者处理] E -- F[确认消息完成] F -- G[标记消息已投递]核心约束业务操作和消息写入在同一个本地事务中保证原子性。消息投递失败则重试消费端保证幂等。三、生产级本地消息表实现3.1 消息表设计与核心代码-- 本地消息表 DDL CREATE TABLE outbox_messages ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, -- 消息唯一标识, 用于幂等判断 message_id VARCHAR(64) NOT NULL, -- 目标服务标识 target_service VARCHAR(64) NOT NULL, -- 消息类型, 决定消费端处理逻辑 message_type VARCHAR(128) NOT NULL, -- 消息体 (JSON) payload JSON NOT NULL, -- 消息状态: PENDING / SENT / CONFIRMED / FAILED status ENUM(PENDING, SENT, CONFIRMED, FAILED) NOT NULL DEFAULT PENDING, -- 重试次数 retry_count TINYINT UNSIGNED NOT NULL DEFAULT 0, -- 最大重试次数 max_retries TINYINT UNSIGNED NOT NULL DEFAULT 5, -- 下次重试时间 next_retry_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 创建时间 created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 更新时间 updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (id), UNIQUE KEY uk_message_id (message_id), KEY idx_status_next_retry (status, next_retry_at), KEY idx_target_service (target_service, status) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4;import pymysql import json import uuid import time import threading import logging from typing import Callable, Dict, Optional from dataclasses import dataclass from enum import Enum logger logging.getLogger(__name__) class MessageStatus(Enum): PENDING PENDING SENT SENT CONFIRMED CONFIRMED FAILED FAILED dataclass class OutboxMessage: 消息表记录 id: int 0 message_id: str target_service: str message_type: str payload: dict None status: MessageStatus MessageStatus.PENDING retry_count: int 0 max_retries: int 5 next_retry_at: str class OutboxManager: 本地消息表管理器, 保证业务操作与消息投递的最终一致性 def __init__(self, mysql_config: dict): self.mysql_config mysql_config def _get_connection(self): return pymysql.connect(**self.mysql_config) def execute_with_message(self, business_sqls: list, target_service: str, message_type: str, payload: dict) - str: 在同一个本地事务中执行业务操作并写入消息表 message_id str(uuid.uuid4()) payload_json json.dumps(payload, ensure_asciiFalse) try: with self._get_connection() as conn: with conn.cursor() as cur: # 开启事务 cur.execute(BEGIN) try: # 1. 执行业务 SQL for sql, params in business_sqls: cur.execute(sql, params) # 2. 写入消息表 (同一事务) cur.execute( INSERT INTO outbox_messages (message_id, target_service, message_type, payload, status) VALUES (%s, %s, %s, %s, %s), (message_id, target_service, message_type, payload_json, MessageStatus.PENDING.value) ) # 3. 提交事务: 业务操作和消息写入要么同时成功, 要么同时回滚 conn.commit() logger.info(f业务操作消息写入成功, message_id{message_id}) return message_id except Exception as e: conn.rollback() logger.error(f事务执行失败, 已回滚: {e}) raise except pymysql.err.OperationalError as e: logger.error(f数据库连接失败: {e}) raise def fetch_pending_messages(self, batch_size: int 100) - list: 扫描待投递的消息 sql SELECT id, message_id, target_service, message_type, payload, retry_count, max_retries FROM outbox_messages WHERE status PENDING AND next_retry_at NOW() ORDER BY id ASC LIMIT %s FOR UPDATE SKIP LOCKED try: with self._get_connection() as conn: with conn.cursor() as cur: cur.execute(sql, (batch_size,)) columns [desc[0] for desc in cur.description] return [dict(zip(columns, row)) for row in cur.fetchall()] except Exception as e: logger.error(f扫描消息失败: {e}) return [] def mark_sent(self, message_id: str): 标记消息已投递 sql UPDATE outbox_messages SET status SENT, updated_at NOW() WHERE message_id %s AND status PENDING try: with self._get_connection() as conn: with conn.cursor() as cur: cur.execute(sql, (message_id,)) except Exception as e: logger.error(f标记消息失败: {e}) def mark_confirmed(self, message_id: str): 标记消息已确认消费 sql UPDATE outbox_messages SET status CONFIRMED, updated_at NOW() WHERE message_id %s try: with self._get_connection() as conn: with conn.cursor() as cur: cur.execute(sql, (message_id,)) except Exception as e: logger.error(f确认消息失败: {e}) def mark_retry(self, message_id: str, retry_count: int, max_retries: int): 标记消息需要重试 if retry_count max_retries: # 超过最大重试次数, 标记为失败 sql UPDATE outbox_messages SET status FAILED, updated_at NOW() WHERE message_id %s logger.error(f消息 {message_id} 超过最大重试次数, 标记为 FAILED) else: # 指数退避: 2^retry_count 秒后重试 delay_seconds 2 ** retry_count sql UPDATE outbox_messages SET retry_count %s, next_retry_at DATE_ADD(NOW(), INTERVAL %s SECOND), updated_at NOW() WHERE message_id %s try: with self._get_connection() as conn: with conn.cursor() as cur: cur.execute(sql, (retry_count 1, delay_seconds, message_id)) except Exception as e: logger.error(f重试标记失败: {e}) return try: with self._get_connection() as conn: with conn.cursor() as cur: cur.execute(sql, (message_id,)) except Exception as e: logger.error(f失败标记失败: {e}) class OutboxDispatcher: 消息投递调度器, 后台线程扫描并投递消息 def __init__(self, outbox_manager: OutboxManager, send_func: Callable[[dict], bool], poll_interval: float 1.0): self.outbox outbox_manager self.send_func send_func # 实际发送消息的函数 self.poll_interval poll_interval self._stop_event threading.Event() def start(self): 启动后台投递线程 def _worker(): while not self._stop_event.is_set(): try: messages self.outbox.fetch_pending_messages(batch_size50) for msg in messages: success self.send_func({ message_id: msg[message_id], target_service: msg[target_service], message_type: msg[message_type], payload: json.loads(msg[payload]), }) if success: self.outbox.mark_sent(msg[message_id]) else: self.outbox.mark_retry( msg[message_id], msg[retry_count], msg[max_retries], ) except Exception as e: logger.error(f投递循环异常: {e}) self._stop_event.wait(self.poll_interval) thread threading.Thread(target_worker, daemonTrue) thread.start() logger.info(消息投递调度器已启动) def stop(self): self._stop_event.set()3.2 三种方案的性能对比维度Seata ATTCC本地消息表一致性强一致全局锁强一致业务锁最终一致写入延迟20-50ms全局锁等待10-30msTry 阶段2-5ms本地事务吞吐量低全局锁争用中高开发成本低自动补偿高三套代码中运维复杂度高TC 集群中低适用场景强一致金融交易高并发资金操作大多数业务场景四、分布式事务方案的架构权衡4.1 Seata AT 的全局锁瓶颈AT 模式的全局锁在分支事务提交前持有其他事务修改同一行必须等待。高并发场景下全局锁等待超时是主要故障来源。seata.lock.retryInterval和seata.lock.retryTimes控制锁等待策略但增大重试次数会延长事务持有时间加剧锁争用。4.2 TCC 的空回滚与悬挂TCC 模式有两个经典问题空回滚Try 请求因网络超时未到达TC 触发 CancelCancel 需要处理未 Try 就 Cancel的情况悬挂Cancel 先于 Try 到达Cancel 执行后 Try 才到达Try 预留的资源无人释放解决方案在 Try 阶段插入一条记录标记已 TryCancel 阶段检查该标记。但这增加了额外的数据库操作。4.3 本地消息表的消息堆积消息投递失败后指数退避重试如果消费端长时间不可用消息表会持续增长。需要设置max_retries上限超过后标记为 FAILED 并告警人工介入。FAILED 消息需要定期清理或归档避免消息表膨胀。4.4 禁用场景Seata AT高并发写入QPS 5000全局锁成为瓶颈TCC业务操作无法拆分为 Try/Confirm/Cancel 三阶段如调用第三方 API本地消息表要求强一致性的金融交易最终一致的延迟不可接受五、总结分布式事务方案的选择本质是一致性、性能、开发成本三者的权衡。Seata AT 提供强一致但牺牲并发性能TCC 提供精确控制但开发成本高本地消息表提供最终一致但延迟可控。生产环境的务实选择90% 的业务场景用本地消息表最终一致 高吞吐10% 的强一致场景用 TCC精确控制 业务锁Seata AT 仅用于快速验证阶段。任何分布式事务方案都不是银弹关键在于识别业务的一致性需求选择匹配的方案而非追求最强一致性。