title: 点赞背后的技术大冒险:分布式事务与SAGA模式
date: 2025/05/07 00:12:40
updated: 2025/05/07 00:12:40
author: cmdragon
excerpt:
在微服务架构中,点赞操作涉及多个服务的数据更新,传统数据库事务在分布式系统中失效,需采用SAGA事务模式。SAGA将事务分解为多个本地事务,通过补偿机制保证最终一致性。每个操作需定义对应的补偿操作,补偿操作需幂等,并记录事务状态和实现超时机制。代码实现包括基础模型定义、事务上下文管理器和核心业务逻辑,测试验证正常和异常流程。生产环境中建议添加事务日志、实现定时补偿任务和服务降级策略。
categories:
tags:
扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
探索数千个预构建的 AI 应用,开启你的下一个伟大创意:https://tools.cmdragon.cn/
在微服务架构中,点赞这类看似简单的操作可能涉及多个服务的数据更新。假设我们有两个微服务:
当用户点赞时,需要同时更新:
传统数据库事务在分布式系统中失效,我们需要采用SAGA事务模式。这种模式将事务分解为多个本地事务,通过补偿机制保证最终一致性。
正常流程:
[文章服务+1] -> [用户服务创建记录]
异常处理:
[文章服务+1] -> [用户服务失败] -> [文章服务-1补偿]
# 文章服务模型
class Article(Tortoise.Model):
id = fields.IntField(pk=True)
title = fields.CharField(max_length=255)
likes = fields.IntField(default=0)
# 用户服务模型
class UserLikeRecord(Tortoise.Model):
id = fields.UUIDField(pk=True)
user_id = fields.BigIntField()
article_id = fields.BigIntField()
created_at = fields.DatetimeField(auto_now_add=True)
# Pydantic响应模型
class LikeResponse(BaseModel):
article_id: int
current_likes: int
user_record_id: UUID
class SagaTransaction:
def __init__(self):
self.compensation_actions = []
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, traceback):
if exc_type is not None:
await self.compensate()
def add_compensation(self, coro_func, *args):
self.compensation_actions.append((coro_func, args))
async def compensate(self):
for coro_func, args in reversed(self.compensation_actions):
try:
await coro_func(*args)
except Exception as e:
logging.error(f"Compensation failed: {str(e)}")
@app.post("/articles/{article_id}/like", response_model=LikeResponse)
async def like_article(
article_id: int,
user_id: int = Header(..., alias="X-User-ID")
):
async with SagaTransaction() as saga:
# 第一步:更新文章点赞数
article = await Article.get(id=article_id)
original_likes = article.likes
article.likes += 1
await article.save()
# 记录补偿操作(回滚点赞数)
saga.add_compensation(
self.compensate_article_likes,
article_id,
original_likes
)
# 第二步:创建用户点赞记录
try:
record = await UserLikeRecord.create(
user_id=user_id,
article_id=article_id
)
except Exception as e:
# 自动触发补偿流程
raise HTTPException(500, "Like record creation failed")
# 记录补偿操作(删除记录)
saga.add_compensation(
self.compensate_user_record,
record.id
)
return LikeResponse(
article_id=article_id,
current_likes=article.likes,
user_record_id=record.id
)
# 补偿方法示例
async def compensate_article_likes(article_id: int, original_count: int):
article = await Article.get(id=article_id)
article.likes = original_count
await article.save()
async def compensate_user_record(record_id: UUID):
await UserLikeRecord.filter(id=record_id).delete()
async def test_successful_like():
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.post(
"/articles/1/like",
headers={"X-User-ID": "123"}
)
assert response.status_code == 200
data = response.json()
assert data["current_likes"] == 1
async def test_failed_transaction():
with patch("UserLikeRecord.create", side_effect=Exception("DB Error")):
response = await ac.post(
"/articles/1/like",
headers={"X-User-ID": "123"}
)
assert response.status_code == 500
# 验证补偿是否执行
article = await Article.get(id=1)
assert article.likes == 0
Q1:为什么补偿操作需要设计为幂等?
A. 提高系统性能
B. 防止重复补偿导致数据错误
C. 减少数据库连接数
D. 满足HTTP协议规范
正确答案:B
解析:网络重试可能导致补偿操作被多次触发,幂等设计确保多次执行结果一致,避免数据不一致。
Q2:以下哪些情况需要触发补偿机制?(多选)
A. 用户服务数据库连接超时
B. 文章不存在返回404错误
C. 用户重复点赞
D. 数据库主从同步延迟
正确答案:A
解析:404属于业务校验错误应在事务开始前检查,重复点赞属于业务逻辑错误,主从同步属于基础架构问题。只有跨服务操作失败需要补偿。
报错1:TransactionManagementError - 事务已关闭
TransactionManagementError: Transaction already closed
原因:异步上下文管理器中过早关闭数据库连接
解决方案:
报错2:HTTP 422 Unprocessable Entity
{
"detail": "Field required"
}
原因:请求体缺少必要字段或类型不匹配
解决方案:
X-User-ID
报错3:TimeoutError - 数据库操作超时
TimeoutError: Connection pool exhausted
原因:数据库连接池不足或查询未优化
解决方案:
TORTOISE_CONFIG["connections"]["default"]["pool_size"] = 20
select_related
优化关联查询class TransactionLog(Tortoise.Model):
transaction_id = fields.UUIDField()
service_name = fields.CharField(max_length=50)
action_type = fields.CharField(max_length=20) # main/compensation
status = fields.CharField(max_length=10) # pending/done/failed
created_at = fields.DatetimeField(auto_now_add=True)
async def check_hanging_transactions():
# 查找超过5分钟未完成的事务
pending = await TransactionLog.filter(
status="pending",
created_at__lt=datetime.now() - timedelta(minutes=5)
)
for transaction in pending:
# 执行补偿逻辑
await retry_compensation(transaction)
(完整示例代码需配合PostgreSQL数据库运行,安装依赖:fastapi uvicorn tortoise-orm httpx python-multipart
)
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:点赞背后的技术大冒险:分布式事务与SAGA模式 | cmdragon's Blog
参与评论
手机查看
返回顶部