扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
STOMP(Simple Text Oriented Messaging Protocol)是一种基于文本的轻量级消息协议,常用于实现发布/订阅模式。与直接使用WebSocket相比,STOMP提供了更结构化的消息格式,支持以下核心功能:
在FastAPI中实现STOMP协议的核心思路是通过WebSocket建立连接后,在消息处理层添加STOMP协议解析器。整个过程分为三个阶段:
CONNECT
帧建立STOMP会话SUBSCRIBE
命令订阅消息通道SEND
命令向指定目的地发送消息以下示例代码演示了如何在FastAPI中实现STOMP协议支持:
# 环境依赖:fastapi==0.103.0 uvicorn==0.23.2 stomp.py==8.0.1
from fastapi import FastAPI, WebSocket
from stomp import parse_frame, Frame
app = FastAPI()
class StompManager:
def __init__(self):
self.subscriptions = {}
async def handle_connect(self, frame, websocket):
# 协议版本验证
if frame.headers.get('accept-version') != '1.2':
await websocket.send_text('ERRORnversion-not-supportednn')
return False
return True
async def handle_subscribe(self, frame, websocket):
destination = frame.headers['destination']
sub_id = frame.headers['id']
self.subscriptions[sub_id] = {
'destination': destination,
'websocket': websocket
}
@app.websocket("/stomp")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
manager = StompManager()
try:
while True:
data = await websocket.receive_text()
frame = parse_frame(data)
if frame.command == 'CONNECT':
if await manager.handle_connect(frame, websocket):
await websocket.send_text("CONNECTEDnversion:1.2nn")
elif frame.command == 'SUBSCRIBE':
await manager.handle_subscribe(frame, websocket)
elif frame.command == 'SEND':
# 消息路由逻辑
pass
except Exception as e:
print(f"连接异常: {str(e)}")
stomp.py
库的parse_frame
方法解析原始消息StompManager
类维护订阅关系CONNECT
阶段验证协议版本兼容性实现消息广播功能的核心代码:
from typing import Dict
from fastapi import WebSocket
from pydantic import BaseModel
class Subscription(BaseModel):
destination: str
websocket: WebSocket
class MessageRouter:
def __init__(self):
self.channels: Dict[str, list] = {}
async def add_subscriber(self, channel: str, websocket: WebSocket):
if channel not in self.channels:
self.channels[channel] = []
self.channels[channel].append(websocket)
async def broadcast(self, channel: str, message: str):
for ws in self.channels.get(channel, []):
await ws.send_text(message)
# 在SEND命令处理中调用
async def handle_send(frame, router: MessageRouter):
destination = frame.headers['destination']
await router.broadcast(destination, frame.body)
问题1:当客户端发送的STOMP协议版本不匹配时,服务端应该返回什么响应?
答案:服务端应返回ERROR
帧,并在headers中包含version-not-supported
错误码,立即关闭连接。
问题2:如何防止消息路由时的循环广播?
答案:在消息头中添加message-id
字段,服务端维护已处理消息ID的缓存,对重复ID的消息直接丢弃。
报错1:STOMP Protocol Error: Missing required header 'destination'
原因:SEND或SUBSCRIBE帧缺少destination头
解决方案:
if 'destination' not in frame.headers:
await websocket.send_text('ERRORnmissing-destinationnn')
报错2:WebSocket connection is already closed
原因:尝试向已关闭的连接发送消息
解决方案:
# 发送前检查连接状态
for ws in list(self.channels[channel]):
if ws.client_state == WebSocketState.DISCONNECTED:
self.channels[channel].remove(ws)
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何在FastAPI中玩转STOMP协议升级,让你的消息传递更高效?
参与评论
手机查看
返回顶部