扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
graph TD User -->|属于| Role Role -->|包含| Permission Permission -->|对应| APIEndpoint
from sqlalchemy import Column, Integer, String, Table, ForeignKey
from sqlalchemy.orm import relationship
from pydantic import BaseModel
# 数据库模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
roles = relationship('Role', secondary='user_roles')
class Role(Base):
__tablename__ = 'roles'
id = Column(Integer, primary_key=True)
permissions = relationship('Permission', secondary='role_permissions')
class Permission(Base):
__tablename__ = 'permissions'
id = Column(Integer, primary_key=True)
endpoint = Column(String(50), unique=True)
# 中间关联表
user_roles = Table('user_roles', Base.metadata,
Column('user_id', ForeignKey('users.id')),
Column('role_id', ForeignKey('roles.id')))
role_permissions = Table('role_permissions', Base.metadata,
Column('role_id', ForeignKey('roles.id')),
Column('permission_id', ForeignKey('permissions.id')))
graph TD A[开始] --> B{用户是否已登录?} B -->|否| C[提示用户登录] B -->|是| D{用户角色是否被允许访问?} D -->|否| E[访问被拒绝,提示权限不足] D -->|是| F{请求的资源是否需要特殊权限?} F -->|否| G[访问被允许,加载资源] F -->|是| H{用户是否具有特殊权限?} H -->|否| E H -->|是| G G --> I[结束]
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def check_permission(
required_permission: str,
token: str = Depends(oauth2_scheme)
):
user = await authenticate_user(token)
if not any(perm.endpoint == required_permission
for role in user.roles
for perm in role.permissions):
raise HTTPException(status_code=403, detail="Forbidden")
return user
# 路由使用示例
@app.get("/admin/dashboard")
async def admin_dashboard(user: User = Depends(check_permission("admin_dashboard"))):
return {"message": "Welcome Admin"}
from fastapi import APIRouter
def create_router_with_permissions(endpoint: str, permission: str):
router = APIRouter()
@router.get(f"/{endpoint}")
async def endpoint_handler(user: User = Depends(check_permission(permission))):
# 业务逻辑
return {"data": "secured"}
return router
from fastapi import Request
import datetime
@app.middleware("http")
async def audit_logger(request: Request, call_next):
start_time = datetime.datetime.now()
response = await call_next(request)
process_time = (datetime.datetime.now() - start_time).total_seconds()
audit_log = {
"user": request.state.user.id if hasattr(request.state, 'user') else None,
"endpoint": request.url.path,
"method": request.method,
"status_code": response.status_code,
"timestamp": datetime.datetime.now().isoformat(),
"processing_time": process_time
}
# 异步写入数据库
await save_audit_log(audit_log)
return response
答案:B
解析:403状态码表示权限不足。当用户角色未绑定对应接口权限时,系统会阻止访问,需要检查角色权限配置。
原因:令牌格式错误或签名不匹配
解决方案:
原因:未正确处理匿名用户访问
修复方案:
# 在权限检查函数中添加空值处理
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
fastapi==0.68.0
python-jose[cryptography]==3.3.0
sqlalchemy==1.4.36
uvicorn==0.15.0
部署建议:使用uvicorn启动服务时开启SSL加密:
uvicorn main:app --ssl-keyfile=./key.pem --ssl-certfile=./cert.pem
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:RBAC权限模型如何让API访问控制既安全又灵活?
参与评论
手机查看
返回顶部