扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
基于角色的访问控制(RBAC)是保护系统资源的经典模型,其核心包含三个要素:
核心依赖包及版本要求:
fastapi == 0.95
.2
uvicorn == 0.22
.0
python - jose[cryptography] == 3.3
.0
passlib[bcrypt] == 1.7
.4
安装命令:
pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt]
使用 Pydantic 和 SQLAlchemy 构建领域模型:
from pydantic import BaseModel
from sqlalchemy import Column, Integer, String, Table, ForeignKey
class PermissionBase(BaseModel):
code: str # 权限编码,如 "order:delete"
description: str
class RoleCreate(BaseModel):
name: str
permission_ids: list[int]
# SQLAlchemy 模型
user_role = Table(
'user_role', Base.metadata,
Column('user_id', ForeignKey('users.id')),
Column('role_id', ForeignKey('roles.id'))
)
role_permission = Table(
'role_permission', Base.metadata,
Column('role_id', ForeignKey('roles.id')),
Column('permission_id', ForeignKey('permissions.id'))
)
JWT 令牌签发与验证:
from jose import JWTError, jwt
from datetime import datetime, timedelta
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
def create_access_token(data: dict) -> str:
expire = datetime.utcnow() + timedelta(hours=1)
return jwt.encode(
{**data, "exp": expire},
SECRET_KEY,
algorithm=ALGORITHM
)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
try:
payload = jwt.decode(token, SECRET_KEY, [ALGORITHM])
return await UserService.get_user(payload["sub"])
except JWTError:
raise HTTPException(401, "Invalid token")
动态权限依赖注入:
def check_permission(permission_code: str):
async def _permission_checker(
user: User = Depends(get_current_user)
):
if not any(p.code == permission_code for p in user.permissions):
raise HTTPException(403, "Permission denied")
return Depends(_permission_checker)
# 在路由中使用
@app.delete("/orders/{order_id}")
async def delete_order(
order_id: str,
_=check_permission("order:delete")
):
return OrderService.delete_order(order_id)
密码存储安全处理:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
防止暴力破解攻击:
from fastapi import Request
from fastapi.middleware import Middleware
class RateLimiter:
def __init__(self, times: int, seconds: int):
self.times = times
self.seconds = seconds
async def __call__(self, request: Request):
client_ip = request.client.host
# 使用 Redis 记录请求次数
current = await redis.incr(client_ip)
if current > self.times:
raise HTTPException(429, "Too many requests")
await redis.expire(client_ip, self.seconds)
权限验证测试用例:
from fastapi.testclient import TestClient
def test_admin_access():
client = TestClient(app)
# 普通用户令牌
token = create_test_token(user_type="user")
response = client.delete(
"/products/123",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 403
# 管理员令牌
admin_token = create_test_token(user_type="admin")
response = client.delete(
"/products/123",
headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 200
错误 1:401 Unauthorized
{
"detail": "Could not validate credentials"
}
解决方法:
错误 2:403 Forbidden
{
"detail": "Permission denied"
}
预防建议:
问题 1: 为什么要在密码哈希中使用 salt?
答案: Salt 随机值能有效防止彩虹表攻击,即使相同密码也会生成不同的哈希值
问题 2: 如何实现动态权限管理?
答案: 应该建立权限管理系统接口,允许管理员动态配置角色与权限的对应关系
问题 3: JWT 令牌为什么要设置过期时间?
答案: 缩短令牌有效期可以降低令牌泄露风险,建议结合 refresh token 使用
所有代码在以下环境验证通过:
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何用 FastAPI 和 RBAC 打造坚不可摧的电商堡垒?
参与评论
手机查看
返回顶部