扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
实时协作系统需要实现多用户同时操作同一文档/白板,并实时同步所有变更。核心需求包括:
依赖安装:
pip install fastapi==0.104.0 websockets==12.0 uvicorn==0.23.2 pydantic==2.5.2
核心代码实现:
import asyncio
import logging
from typing import List, Dict
from fastapi import FastAPI, WebSocket
from pydantic import BaseModel
app = FastAPI()
logger = logging.getLogger("uvicorn.error")
class Operation(BaseModel):
type: str # "insert" or "delete"
position: int
content: str = "" # 插入内容
length: int = 1 # 删除长度
client_id: str = "" # 客户端标识
version: int = 0 # 操作版本号
# OT转换引擎(示例)
class OTEngine:
@staticmethod
def transform(op1: Operation, op2: Operation) -> Operation:
"""操作转换核心算法"""
# 插入 vs 插入
if op1.type == "insert" and op2.type == "insert":
if op1.position op2.position:
return op2
else: # 相同位置按客户端ID排序
return op2 if op1.client_id op2.position:
return Operation(**{**op2.dict(), "position": op2.position})
else: # 相同位置取范围更大的删除
return op2 if op1.length >= op2.length else Operation(**{**op2.dict(), "length": op1.length})
# 协同编辑房间管理器
class CollaborationRoom:
def __init__(self, room_id: str):
self.room_id = room_id
self.connections = set() # 实际使用redis实现, 这里使用set模拟
self.document = ""
self.version = 0
self.pending_ops: List[Operation] = []
self.lock = asyncio.Lock()
self.client_states: Dict[str, int] = {} # 客户端最后确认版本
async def add_connection(self, websocket: WebSocket, client_id: str):
async with self.lock:
self.connections.add(websocket)
self.client_states[client_id] = self.version
# 发送初始状态
await websocket.send_json({
"type": "snapshot",
"document": self.document,
"version": self.version
})
async def remove_connection(self, websocket: WebSocket, client_id: str):
async with self.lock:
self.connections.discard(websocket)
if client_id in self.client_states:
del self.client_states[client_id]
async def apply_operation(self, operation: Operation):
"""应用操作转换并更新文档"""
async with self.lock:
# 转换所有待处理操作
transformed_op = operation
for pending in self.pending_ops:
transformed_op = OTEngine.transform(pending, transformed_op)
# 应用转换后的操作
if transformed_op.type == "insert":
self.document = (self.document[:transformed_op.position] +
transformed_op.content +
self.document[transformed_op.position:])
elif transformed_op.type == "delete":
start = transformed_op.position
end = min(start + transformed_op.length, len(self.document))
self.document = self.document[:start] + self.document[end:]
# 更新状态
self.version += 1
self.pending_ops.append(transformed_op)
# 广播转换后的操作
broadcast_tasks = []
for conn in self.connections:
try:
broadcast_tasks.append(conn.send_json({
"type": "operation",
"operation": transformed_op.dict(),
"document": self.document,
"version": self.version
}))
except:
pass
await asyncio.gather(*broadcast_tasks, return_exceptions=True)
# 清除已处理的操作
min_client_version = min(self.client_states.values(), default=self.version)
self.pending_ops = [op for op in self.pending_ops if op.version >= min_client_version]
# 全局房间管理
room_manager: Dict[str, CollaborationRoom] = {} # 实际使用redis实现, 这里使用字典模拟
global_lock = asyncio.Lock()
@app.websocket("/ws/{room_id}/{client_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str, client_id: str):
await websocket.accept()
# 获取或创建房间
async with global_lock:
if room_id not in room_manager:
room_manager[room_id] = CollaborationRoom(room_id)
room = room_manager[room_id]
# 加入房间
await room.add_connection(websocket, client_id)
logger.info(f"Client {client_id} joined room {room_id}")
try:
while True:
data = await websocket.receive_json()
op = Operation(**data)
op.client_id = client_id
# 应用操作转换
await room.apply_operation(op)
except Exception as e:
logger.error(f"Error in room {room_id}: {str(e)}")
finally:
# 离开房间
await room.remove_connection(websocket, client_id)
logger.info(f"Client {client_id} left room {room_id}")
# 清理空房间
async with global_lock:
if not room.connections:
del room_manager[room_id]
logger.info(f"Room {room_id} closed")
关键机制:
WebSocket
协议替代HTTP长轮询active_connections
优化技巧:
requestAnimationFrame
合并高频操作组件核心代码:
在线用户 ({{ users.length }})
{{ user }}
sequenceDiagram participant ClientA participant Server participant ClientB ClientA->>ClientA: 用户输入操作 ClientA->>ClientA: 乐观更新UI ClientA->>Server: 发送操作(位置+内容) Server->>Server: 应用OT转换 Server->>ClientA: 广播确认操作 Server->>ClientB: 广播转换后操作 ClientB->>ClientB: 应用转换后操作
状态同步机制:
sequenceDiagram participant ClientA participant Server participant ClientB ClientA->>Server: 发送操作OP_A(位置P) Server->>Server: 转换操作:OP_A' = OT(OP_A, 待处理操作) Server->>Server: 更新文档状态 Server->>ClientA: 广播转换后的OP_A' Server->>ClientB: 广播转换后的OP_A' ClientB->>ClientB: 应用OP_A'更新本地文档 ClientB->>Server: 发送新操作OP_B
冲突解决流程:
消息协议设计:
// 快照消息(初始同步)
{
"type": "snapshot",
"document": "当前文本",
"version": 15
}
// 操作消息
{
"type": "operation",
"operation": {
"type": "insert",
"position": 5,
"content": "hello",
"client_id": "user1",
"version": 16
},
"document": "更新后文本",
"version": 16
}
同步策略对比:
策略 | 优点 | 缺点 |
---|---|---|
最后写入优先 | 实现简单 | 可能丢失用户操作 |
操作转换(OT) | 精确解决冲突 | 算法复杂度高 |
CRDT | 无需中心协调 | 内存消耗较大 |
压力测试命令:
pip install websocket-client
python -m websockets ws://localhost:8000/ws/room1 -c 1000 -m "测试消息"
部署架构:
客户端 → Nginx → FastAPI (Uvicorn) → Redis Cluster
Nginx配置要点:
http {
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
location /ws/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
}
}
Q1:当两个用户同时在第5个字符位置插入不同内容时,如何保证最终一致性?
答案:采用操作转换算法,根据操作逻辑时间戳调整插入位置。例如用户A插入"X",用户B插入"Y",最终在第5位显示"YX"或"XY"
,取决于操作到达服务器的顺序。
Q2:WebSocket连接频繁断开可能是什么原因?
解决方案:
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
错误1:403 Forbidden
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
错误2:1006 Connection Closed Abnormally
function connect() {
const ws = new WebSocket(url)
ws.onclose = () => setTimeout(connect, 1000)
}
通过本文的实践,开发者可以掌握实时协作系统的核心实现技术。建议在开发过程中使用wireshark
工具监控WebSocket流量,并配合Chrome DevTools
的Performance面板进行前端性能优化。
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何用WebSocket打造毫秒级实时协作系统?
参与评论
手机查看
返回顶部