扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
# 示例代码运行环境
# Python 3.8+
# 安装依赖:pip install fastapi==0.68.0 uvicorn==0.15.0 websockets==10.3 pydantic==1.10.7
使用@app.websocket装饰器声明WebSocket路由:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws/chat/{room_id}")
async def websocket_chat(websocket: WebSocket, room_id: int):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Room {room_id}: {data}")
except WebSocketDisconnect:
print("Client disconnected")
关键要点解析:
结合路径参数与查询参数进行复合验证:
from fastapi import Query, WebSocket
@app.websocket("/ws/secure/{client_id}")
async def secure_ws(
websocket: WebSocket,
client_id: int = Path(..., gt=0),
token: str = Query(..., min_length=8)
):
if not validate_token(token):
await websocket.close(code=1008)
return
# ...连接处理逻辑...
自定义握手验证流程:
@app.websocket("/ws/auth")
async def auth_ws(websocket: WebSocket):
# 手动控制握手过程
await websocket.accept()
# 获取握手时的请求头
headers = websocket.headers
auth_token = headers.get("authorization")
if not verify_jwt_token(auth_token):
await websocket.close(code=1008, reason="Invalid credentials")
return
# 验证通过后的处理逻辑
使用字典维护活跃连接:
from typing import Dict
active_connections: Dict[str, WebSocket] = {}
@app.websocket("/ws/status")
async def status_ws(websocket: WebSocket):
await websocket.accept()
client_id = str(websocket.client)
active_connections[client_id] = websocket
try:
while True:
# 保持连接活跃
await websocket.receive_text()
except WebSocketDisconnect:
del active_connections[client_id]
import asyncio
async def heartbeat(websocket: WebSocket, interval: int = 30):
try:
while True:
await asyncio.sleep(interval)
await websocket.send_json({
"type": "heartbeat",
"timestamp": time.time()
})
except WebSocketDisconnect:
print("Heartbeat terminated")
@app.websocket("/ws/realtime")
async def realtime_ws(websocket: WebSocket):
await websocket.accept()
# 启动独立心跳任务
heartbeat_task = asyncio.create_task(heartbeat(websocket))
try:
# 主消息处理循环
while True:
data = await websocket.receive_json()
# 处理业务逻辑...
finally:
heartbeat_task.cancel()
await websocket.close()
# 正确处理方法
try:
data = await websocket.receive_json()
except WebSocketDisconnect:
# 处理断开逻辑
except ValueError:
await websocket.send_text("ERROR: 仅支持JSON格式")
# 错误处理方法:直接使用未校验的receive()
# 正确方法
client_ip = websocket.client.host
# 常见错误:直接读取X-Forwarded-For头
# 需要配合代理设置处理
现象:客户端连接时立即断开
分析:未通过握手验证,常见于:
# 在拒绝连接时立即关闭
if not valid:
await websocket.close(code=1008)
return # 必须立即返回
现象:客户端非正常断开
处理方案:
try:
while True:
data = await websocket.receive_text()
except WebSocketDisconnect as e:
print(f"断开代码:{e.code}")
场景:发送非文本数据时
正确做法:
# 发送二进制数据
await websocket.send_bytes(binary_data)
# 发送文本数据
await websocket.send_text("message")
# 发送JSON
await websocket.send_json({"key": "value"})
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何在FastAPI中玩转WebSocket,让实时通信不再烦恼?
参与评论
手机查看
返回顶部