扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
# 运行环境:Python 3.8+
# 安装依赖:pip install fastapi==0.68.0 uvicorn==0.15.0 websockets==10.3 pydantic==1.10.7
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收文本消息
client_msg = await websocket.receive_text()
# 处理消息(示例:添加时间戳)
server_response = f"[{datetime.now()}] Server received: {client_msg}"
# 发送文本响应
await websocket.send_text(server_response)
except WebSocketDisconnect:
print("Client disconnected")
ws://
协议建立WebSocket连接await websocket.accept()
接受连接receive_text()
和send_text()
应用场景:实时聊天室、协同编辑系统、实时日志监控
@app.websocket("/ws/file-transfer")
async def websocket_file(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收二进制数据
binary_data = await websocket.receive_bytes()
# 保存文件示例
with open("received_file.bin", "wb") as f:
f.write(binary_data)
# 发送确认消息
await websocket.send_bytes(b"FILE_RECEIVED")
except WebSocketDisconnect:
print("File transfer interrupted")
receive_bytes()
和send_bytes()
方法from pydantic import BaseModel
class MessageModel(BaseModel):
user: str
content: str
timestamp: float
@app.websocket("/ws/json-demo")
async def websocket_json(websocket: WebSocket):
await websocket.accept()
try:
while True:
json_data = await websocket.receive_json()
# 自动验证JSON结构
message = MessageModel(**json_data)
# 处理业务逻辑
processed_data = message.dict()
processed_data["status"] = "PROCESSED"
# 返回处理结果
await websocket.send_json(processed_data)
except ValidationError as e:
await websocket.send_json({"error": str(e)})
from websockets.exceptions import ConnectionClosed
@app.websocket("/ws/with-timeout")
async def websocket_timeout(websocket: WebSocket):
await websocket.accept()
try:
while True:
try:
# 设置10秒接收超时
data = await asyncio.wait_for(websocket.receive_text(), timeout=10)
await process_message(data)
except asyncio.TimeoutError:
# 发送心跳包保持连接
await websocket.send_text("HEARTBEAT")
except ConnectionClosed:
print("Connection closed normally")
asyncio.wait_for
设置单次接收超时Q1:如何处理同时接收文本和二进制消息的场景?
A:通过receive()
方法获取消息类型判断:
message = await websocket.receive()
if message["type"] == "websocket.receive.text":
handle_text(message["text"])
elif message["type"] == "websocket.receive.bytes":
handle_bytes(message["bytes"])
Q2:为什么推荐使用Pydantic进行JSON验证?
A:① 自动类型转换 ② 字段约束检查 ③ 防御无效数据 ④ 生成API文档
422 Validation Error
{
"detail": [
{
"loc": [
"body",
"timestamp"
],
"msg": "field required",
"type": "value_error.missing"
}
]
}
解决方法:
timestamp: float = None
WebSocketTimeoutException
wait_for
超时参数,添加心跳机制余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何在FastAPI中玩转WebSocket消息处理?
参与评论
手机查看
返回顶部