扫描二维码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
关键设计特点:
# requirements.txt
fastapi==0.103.2
pydantic==2.5.3
uvicorn==0.23.2
from fastapi import BackgroundTasks, FastAPI
from pydantic import BaseModel, EmailStr
app = FastAPI()
class UserRegister(BaseModel):
username: str
email: EmailStr
password: str
async def send_welcome_email(email: str):
"""模拟邮件发送(实际应使用SMTP库)"""
print(f"Sending email to {email}")
# 这里可以添加真正的邮件发送逻辑
@app.post("/register", status_code=201)
async def create_user(
user: UserRegister,
background_tasks: BackgroundTasks
):
"""用户注册接口"""
# 主逻辑处理
print(f"Creating user: {user.username}")
# 添加后台任务
background_tasks.add_task(
send_welcome_email,
user.email
)
return {"message": "User created successfully"}
async def cleanup_failed_registration(user_id: int):
"""注册失败后的数据回滚"""
async with AsyncSessionLocal() as session:
await session.execute(
delete(User).where(User.id == user_id)
)
await session.commit()
@app.post("/register")
async def register_user(
user: UserRegister,
background_tasks: BackgroundTasks
):
try:
# 数据库操作...
except Exception as e:
background_tasks.add_task(
cleanup_failed_registration,
new_user.id
)
raise HTTPException(...)
def task_wrapper(func):
"""任务执行监控装饰器"""
async def wrapper(*args, **kwargs):
try:
print(f"Starting task {func.__name__}")
await func(*args, **kwargs)
print(f"Task {func.__name__} completed")
except Exception as e:
print(f"Task failed: {str(e)}")
# 可添加重试逻辑
return wrapper
@app.post("/batch-process")
async def batch_processing(
background_tasks: BackgroundTasks
):
background_tasks.add_task(
task_wrapper(process_data)
)
background_tasks.add_task(
task_wrapper(generate_reports)
)
from fastapi.concurrency import run_in_threadpool
async def resource_intensive_task():
# CPU密集型任务示例
await run_in_threadpool(
lambda: heavy_computation()
)
Q1: 当后台任务需要访问数据库时,应该如何正确处理数据库会话?
A) 直接使用主请求的会话
B) 每个任务创建独立会话
C) 使用全局共享会话
正确答案:B
解析:每个后台任务应该创建独立的数据库会话,因为主请求的会话在响应返回后可能已关闭。推荐在任务内部使用上下文管理器创建新会话。
报错现象:
RuntimeError: No response returned.
原因分析:
在后台任务中直接返回了响应对象
解决方案:
预防建议:
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:FastAPI后台任务:是时候让你的代码飞起来了吗?
参与评论
手机查看
返回顶部