扫描二维码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
FastAPI 的请求-响应周期遵循标准 ASGI 协议,可以比作餐厅的点餐流程:
这种同步处理模式在遇到耗时操作时会形成"前厅拥堵",就像厨师做菜时间太长导致顾客排队。
from fastapi import FastAPI
import time
app = FastAPI()
# 同步处理示例
@app.get("/sync-task")
def sync_task():
time.sleep(5) # 模拟耗时操作
return {"status": "completed"}
当访问该接口时,整个服务将阻塞5秒无法处理其他请求。通过 time.sleep(5)
我们可以直观感受到同步处理对性能的影响。
FastAPI 采用双通道设计实现请求-响应与后台任务分离,其工作原理类似于快递柜系统:
graph LR A[客户端请求] --> B{请求类型判断} B -->|即时API调用| C[主线程处理] B -->|后台任务| D[消息队列] C --> E[快递柜格子-响应区] D --> F[异步工作线程] F --> G[快递柜格子-存储区] E --> H[同步取件] G --> I[异步取件/通知] style E fill:#cff,stroke:#333 style G fill:#fcf,stroke:#333
from fastapi import BackgroundTasks
from pydantic import BaseModel
class Notification(BaseModel):
message: str
user_id: int
def send_notification(email: str, message: str):
# 模拟耗时通知操作
print(f"Sending message to {email}: {message}")
@app.post("/notify/{email}")
async def send_email_notification(
email: str,
notification: Notification,
background_tasks: BackgroundTasks
):
background_tasks.add_task(send_notification, email, notification.message)
return {"message": "Notification queued"}
代码中 BackgroundTasks
参数会自动注入上下文,通过类型声明实现依赖注入。注意任务函数应当是非异步的常规函数,这与 FastAPI 的线程池执行策略有关。
通过以下表格理解不同场景的技术选型:
场景特征 | BackgroundTasks | Celery |
---|---|---|
任务执行时间 | ≥1分钟 | |
需要任务状态跟踪 | 否 | 是 |
需要失败重试机制 | 基本支持 | 完善支持 |
跨进程/跨机器执行 | 否 | 是 |
# Celery 集成示例(需安装 celery==5.2.7)
from celery import Celery
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1"
)
@celery_app.task
def long_running_task(data: dict):
# 模拟长时间数据处理
time.sleep(120)
return {"result": "processed"}
后台任务中访问数据库时需要特别注意依赖生命周期管理。错误示例:
# 错误用法:直接传递数据库连接
def bad_task(db_conn):
db_conn.execute(...) # 可能使用已关闭的连接
# 正确用法:通过依赖重新获取
def good_task():
db_conn = get_db() # 重新建立连接
db_conn.execute(...)
FastAPI 提供两种错误处理模式:
# 即时错误捕获模式
background_tasks.add_task(handle_errors(safe_task))
# 延迟错误记录模式
background_tasks.add_task(unsafe_task, on_error=error_logger)
推荐使用装饰器模式封装任务函数:
from functools import wraps
def retry(max_attempts=3):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
while attempts
问题1: 当后台任务需要访问数据库连接时,应该如何处理依赖关系?
A. 直接传递数据库连接对象
B. 在任务内部重新创建连接
C. 使用全局单例连接
D. 避免在后台任务操作数据库
答案: B。根据 FastAPI 的依赖生命周期管理,应该在任务内部重新创建数据库连接,直接传递连接对象可能导致使用已关闭的连接(A错误),全局单例(C)可能引发线程安全问题,D选项不符合实际需求。
问题2: 以下哪种情况应该优先选择 Celery 而不是 BackgroundTasks?
A. 需要发送欢迎邮件
B. 用户上传文件后生成缩略图
C. 每月一次的报表生成
D. 实时聊天消息推送
答案: C。每月报表生成属于长时间任务且需要可靠执行,符合 Celery 的应用场景。A、B适合后台任务,D需要实时性不适合异步处理。
错误1: RuntimeError: No context available to access BackgroundTasks
原因: 在非请求上下文中调用后台任务
解决: 检查任务触发位置,确保只在路由处理函数中使用 BackgroundTasks 参数
错误2: TypeError: BackgroundTasks only supports sync functions
原因: 尝试添加异步函数作为后台任务
解决: 将任务函数改为同步函数或使用 Celery 等异步任务队列
错误3: Task was lost but worker is still connected
原因: Celery 任务超时未确认
解决:
task_acks_late=True
配置broker_connection_timeout
参数预防建议:
# 在 Celery 配置中添加健康检查
celery_app.conf.worker_ping_interval = 30 # 30秒一次健康检查
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:FastAPI的请求-响应周期为何需要后台任务分离?
参与评论
手机查看
返回顶部