title: FastAPI安全异常处理:从401到422的奇妙冒险
date: 2025/06/05 21:06:31
updated: 2025/06/05 21:06:31
author: cmdragon
excerpt:
FastAPI安全异常处理核心原理与实践包括认证失败的标准HTTP响应规范、令牌异常的特殊场景处理以及完整示例代码。HTTP状态码选择原则建议使用401、403和422,错误响应结构应统一。JWT令牌异常分为签名篡改、过期和格式错误,推荐状态码为401。通过依赖注入实现令牌校验,并采用双令牌策略实现令牌刷新机制。完整示例代码展示了如何创建和验证JWT令牌,以及如何保护路由。
categories:
tags:
扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
探索数千个预构建的 AI 应用,开启你的下一个伟大创意:https://tools.cmdragon.cn/
(注:根据用户要求,章节编号从"第一章"开始,不使用"深入"等词汇)
HTTP状态码是API与客户端沟通的第一语言。FastAPI建议采用以下规范:
示例:访问需要管理员权限的接口时,普通用户会收到403而非401,因为此时凭证验证已通过,但权限不足
建议统一错误响应格式以提升客户端处理效率:
{
"detail": {
"code": "AUTH-001", # 自定义错误编码
"message": "Token expired", # 人类可读信息
"type": "token_expired" # 机器识别类型
}
}
通过覆盖默认异常处理实现标准化:
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(HTTPException)
async def custom_http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={
"detail": {
"code": exc.headers.get("X-Error-Code", "UNKNOWN"),
"message": exc.detail,
"type": exc.headers.get("X-Error-Type", "unknown")
}
},
headers=exc.headers
)
异常类型 | 检测方法 | 推荐状态码 |
---|---|---|
签名篡改 | 签名验证失败 | 401 |
过期令牌 | 检查exp字段 | 401 |
格式错误 | Header/Payload格式解析失败 | 401 |
from jose import JWTError, jwt
from fastapi import Depends, HTTPException
from pydantic import BaseModel
class TokenData(BaseModel):
username: str | None = None
async def validate_token(token: str = Depends(oauth2_scheme)) -> TokenData:
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
headers={"X-Error-Code": "AUTH-003"}
)
try:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=[ALGORITHM]
)
if (exp := payload.get("exp")) is None or exp
使用双令牌策略(access_token + refresh_token):
from datetime import datetime, timedelta
def create_tokens(username: str) -> dict:
access_expire = datetime.utcnow() + timedelta(minutes=15)
refresh_expire = datetime.utcnow() + timedelta(days=7)
access_payload = {"sub": username, "exp": access_expire, "type": "access"}
refresh_payload = {"sub": username, "exp": refresh_expire, "type": "refresh"}
return {
"access_token": jwt.encode(access_payload, SECRET_KEY, ALGORITHM),
"refresh_token": jwt.encode(refresh_payload, SECRET_KEY, ALGORITHM),
"expires_in": 900 # 秒数
}
# requirements.txt
fastapi == 0.68
.1
python - jose[cryptography] == 3.3
.0
passlib[bcrypt] == 1.7
.4
uvicorn == 0.15
.0
# main.py
from datetime import datetime, timedelta
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
# 配置参数
SECRET_KEY = "your-secret-key-here" # 生产环境应使用环境变量
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError as e:
error_type = "expired" if isinstance(e, jwt.ExpiredSignatureError) else "invalid"
raise HTTPException(
status_code=401,
detail=f"Token validation failed: {error_type}",
headers={"X-Error-Type": error_type}
) from e
return token_data
@app.post("/token")
async def login_for_access_token():
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": "fakeuser"}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/protected/")
async def read_protected_route(current_user: TokenData = Depends(get_current_user)):
return {"message": "Secure content accessed"}
当JWT令牌的签名被篡改时,应该返回什么HTTP状态码?
A) 400
B) 401
C) 403
D) 500
答案:B
解析:签名篡改属于凭证验证失败,应返回401 Unauthorized。403用于已认证用户权限不足的情况。
如何判断JWT令牌是否过期?
A) 检查签发时间(iat)
B) 比较当前时间与exp字段
C) 验证签名有效性
D) 解析payload内容
答案:B
解析:exp字段存储的是UTC时间戳,解码后与当前时间比较即可判断是否过期
报错1:jose.exceptions.JWTDecodeError: Signature verification failed
原因:令牌签名与服务器密钥不匹配
解决步骤:
报错2:HTTP 401 Unauthorized - Token expired
原因:访问时令牌已超过exp时间
解决方案:
预防建议:
(全文完)
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:FastAPI安全异常处理:从401到422的奇妙冒险 | cmdragon's Blog
参与评论
手机查看
返回顶部