扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
实现原理:通过HTTP头携带JWT令牌完成握手阶段的认证,认证失败时返回403状态码中断连接。需使用websockets
库处理连接状态,结合jwt
库进行令牌解码。
认证流程图解:
[客户端] --携带JWT--> [握手请求]
[服务端] --解码令牌--> [验证权限]
验证成功 --> 建立WebSocket连接
验证失败 --> 返回HTTP 403
核心代码实现(需要安装依赖:python-jose[cryptography]==3.3.0
):
from fastapi import WebSocket, HTTPException
from jose import JWTError, jwt
async def websocket_auth(websocket: WebSocket):
token = websocket.headers.get("Authorization")
if not token:
raise HTTPException(status_code=403, detail="未携带令牌")
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload["sub"]
except JWTError:
await websocket.close(code=4001) # 自定义关闭代码
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
username = await websocket_auth(websocket)
# 后续业务逻辑
配置要点:需在中间件中显式声明websocket
协议,并设置allow_origins
白名单:
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://your-domain.com"],
allow_methods=["GET", "POST", "websocket"],
allow_headers=["Authorization"]
)
三层防御策略:
slowapi
实现速率限制from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.websocket("/chat")
@limiter.limit("10/minute") # 每分钟10条消息
async def chat_handler(websocket: WebSocket):
...
limit_req_zone $binary_remote_addr zone=wslimit:10m rate=20r/s;
location /ws {
limit_req zone=wslimit burst=30;
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
MAX_CONNECTIONS = 1000
active_connections = set()
@app.on_event("startup")
async def startup():
# 连接数监控任务
async def monitor():
while True:
if len(active_connections) > MAX_CONNECTIONS * 0.9:
logger.warning("连接数接近上限")
await asyncio.sleep(10)
asyncio.create_task(monitor())
连接池调优参数(uvicorn启动命令):
uvicorn main:app --workers 4 --ws websockets --limit-concurrency 2000
异步任务分流示例:
import concurrent.filters
@app.websocket("/data-stream")
async def data_stream(websocket: WebSocket):
await websocket.accept()
# CPU密集型任务转线程池执行
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, # 使用默认线程池
cpu_intensive_task,
request_data
)
await websocket.send_json(result)
问题1:当WebSocket连接因JWT过期被拒绝时,客户端应该如何处理?
解析:客户端需要捕获连接关闭事件(code 4001),触发令牌刷新流程,获取新令牌后重新建立连接。
问题2:如何验证CORS配置是否对WebSocket生效?
解析:可通过浏览器开发者工具的Network面板查看握手请求的Response Headers中是否包含Access-Control-Allow-Origin
字段。
报错1:403 Forbidden
during WebSocket handshake
原因:未正确传递Authorization头或CORS配置未包含websocket协议
解决:
new WebSocket(`ws://api.com/ws`, {
headers: {Authorization: `Bearer ${token}`}
})
报错2:RuntimeError: Cannot call "receive" once a close message has been sent.
原因:在连接关闭后仍尝试操作WebSocket对象
解决:在所有的await websocket.receive()调用处添加try-except块:
try:
data = await websocket.receive_json()
except WebSocketDisconnect:
break # 退出消息循环
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何让你的WebSocket连接既安全又高效?
参与评论
手机查看
返回顶部