title: 如何在FastAPI中打造一个既安全又灵活的权限管理系统?
date: 2025/06/16 08:17:05
updated: 2025/06/16 08:17:05
author: cmdragon
excerpt:
FastAPI权限系统通过依赖注入实现三级验证:身份认证、角色验证和权限校验。数据库模型包括用户、角色和权限注册表,支持动态管理权限。权限验证依赖项通过检查用户角色权限进行访问控制,动态路由权限注册允许实时添加权限。中间件实时检查用户权限,确保访问安全。系统处理常见报错如422 Unprocessable Entity和数据库连接超时,确保稳定运行。
categories:
tags:
扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
# 所需环境配置(运行前请安装)
# fastapi==0.95.0
# uvicorn==0.21.1
# python-multipart==0.0.6
# sqlalchemy==1.4.46
# pydantic==1.10.7
# passlib==1.7.4
权限系统的本质是请求过滤机制,FastAPI 通过依赖注入系统实现层级验证。当请求到达时,会经历:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from databases import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
hashed_password = Column(String(300))
is_active = Column(Boolean, default=True)
role_id = Column(Integer, ForeignKey("roles.id"))
role = relationship("Role", back_populates="users")
class Role(Base):
__tablename__ = "roles"
id = Column(Integer, primary_key=True)
name = Column(String(20), unique=True)
permissions = Column(String(500)) # 存储逗号分隔的权限标识
users = relationship("User", back_populates="role")
class PermissionRegistry(Base):
__tablename__ = "permission_registry"
id = Column(Integer, primary_key=True)
endpoint = Column(String(100)) # 路由路径
method = Column(String(10)) # HTTP方法
perm_code = Column(String(50)) # 权限标识
from fastapi import Depends, HTTPException, status
from pydantic import BaseModel
class PermissionValidator:
def __init__(self, required_perm: str):
self.required_perm = required_perm
async def __call__(self,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)):
# 获取角色关联的权限
role_perms = current_user.role.permissions.split(",")
# 验证权限是否存在
if self.required_perm not in role_perms:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="没有访问权限"
)
# 记录审计日志(示例)
audit_log = AuditLog(
user_id=current_user.id,
action=f"访问需要 {self.required_perm} 权限的端点"
)
db.add(audit_log)
db.commit()
# 使用示例
@app.get("/admin/dashboard")
async def admin_dashboard(
perm_check: bool = Depends(PermissionValidator("admin_dashboard"))):
return {"message": "欢迎来到管理面板"}
class PermissionRegistration(BaseModel):
endpoint: str
methods: List[str]
perm_code: str
@app.post("/manage/permissions")
async def register_permission(
perm_data: PermissionRegistration,
db: Session = Depends(get_db)
):
for method in perm_data.methods:
existing = db.query(PermissionRegistry).filter_by(
endpoint=perm_data.endpoint,
method=method
).first()
if not existing:
new_perm = PermissionRegistry(
endpoint=perm_data.endpoint,
method=method,
perm_code=perm_data.perm_code
)
db.add(new_perm)
db.commit()
return {"status": "权限注册成功"}
@app.middleware("http")
async def dynamic_permission_check(request: Request, call_next):
# 跳过非业务端点
if request.url.path.startswith(("/docs", "/redoc")):
return await call_next(request)
# 查询权限注册表
db = SessionLocal()
perm_record = db.query(PermissionRegistry).filter_by(
endpoint=request.url.path,
method=request.method
).first()
if perm_record:
# 验证用户权限
current_user = await get_current_user(request)
if perm_record.perm_code not in current_user.role.permissions.split(","):
return JSONResponse(
status_code=403,
content={"detail": "权限不足"}
)
response = await call_next(request)
return response
答案:B) 403。系统在权限验证阶段发现用户权限不足时,会返回403 Forbidden状态码。401表示未认证,404是资源不存在,500是服务器内部错误。
答案:C) 同时实现A和B。应该在路由定义中添加类似Depends(PermissionValidator("perm_management"))的依赖,同时在用户角色系统中设置管理员专属权限。
422 Unprocessable Entity
原因:请求体不符合Pydantic模型验证
解决:检查字段类型是否正确,添加缺失的必填字段
AttributeError: 'NoneType' has no attribute 'permissions'
原因:用户角色未正确关联
解决:检查数据库中的角色关联关系,确保每个用户都有对应的角色
数据库连接超时
预防:使用SQLAlchemy的连接池配置
SQLALCHEMY_DATABASE_URL = "postgresql://user:pass@localhost/dbname?connect_timeout=10"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30
)
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何在FastAPI中打造一个既安全又灵活的权限管理系统? | cmdragon's Blog
参与评论
手机查看
返回顶部