Erlo

如何用WebSocket打造毫秒级实时协作系统?

2025-07-11 16:29:09 发布   56 浏览  
页面报错/反馈
收藏 点赞

cmdragon_cn.png cmdragon_cn.png

扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长

发现1000+提升效率与开发的AI工具和实用程序https://tools.cmdragon.cn/

第六章:全栈项目实战示例:实时协作系统

一、需求分析:实时白板/协同编辑场景

实时协作系统需要实现多用户同时操作同一文档/白板,并实时同步所有变更。核心需求包括:

  1. 毫秒级延迟:用户操作需在300ms内同步给所有参与者
  2. 操作一致性:保证最终所有客户端呈现相同内容
  3. 冲突处理:解决多用户同时修改同一区域的问题
  4. 状态恢复:断线重连后自动同步最新状态
sequenceDiagram participant 用户A participant 用户B participant 服务器 用户A ->> 服务器: 用户A登录 服务器 -->> 用户A: 登录成功响应 用户B ->> 服务器: 用户B登录 服务器 -->> 用户B: 登录成功响应 用户A ->> 服务器: 用户A创建新的白板会话 服务器 -->> 用户A: 白板会话ID 用户B ->> 服务器: 加入现有的白板会话 (会话ID) 服务器 -->> 用户B: 加入成功响应 用户A ->> 服务器: 用户A在白板上绘图/编辑 服务器 -->> 用户B: 更新操作 用户B ->> 服务器: 用户B在白板上绘图/编辑 服务器 -->> 用户A: 更新操作 用户A ->> 服务器: 保存白板内容 服务器 -->> 用户A: 保存成功确认 用户B ->> 服务器: 检视白板内容 服务器 -->> 用户B: 白板内容数据 用户A ->> 服务器: 退出白板会话 服务器 -->> 用户A: 退出成功确认 用户B ->> 服务器: 退出白板会话 服务器 -->> 用户B: 退出成功确认

二、后端WebSocket服务搭建

依赖安装

pip install fastapi==0.104.0 websockets==12.0 uvicorn==0.23.2 pydantic==2.5.2

核心代码实现

import asyncio
import logging
from typing import List, Dict

from fastapi import FastAPI, WebSocket
from pydantic import BaseModel

app = FastAPI()
logger = logging.getLogger("uvicorn.error")


class Operation(BaseModel):
    type: str  # "insert" or "delete"
    position: int
    content: str = ""  # 插入内容
    length: int = 1  # 删除长度
    client_id: str = ""  # 客户端标识
    version: int = 0  # 操作版本号


# OT转换引擎(示例)
class OTEngine:
    @staticmethod
    def transform(op1: Operation, op2: Operation) -> Operation:
        """操作转换核心算法"""
        # 插入 vs 插入
        if op1.type == "insert" and op2.type == "insert":
            if op1.position  op2.position:
                return op2
            else:  # 相同位置按客户端ID排序
                return op2 if op1.client_id  op2.position:
                return Operation(**{**op2.dict(), "position": op2.position})
            else:  # 相同位置取范围更大的删除
                return op2 if op1.length >= op2.length else Operation(**{**op2.dict(), "length": op1.length})


# 协同编辑房间管理器
class CollaborationRoom:
    def __init__(self, room_id: str):
        self.room_id = room_id
        self.connections = set()  # 实际使用redis实现, 这里使用set模拟
        self.document = ""
        self.version = 0
        self.pending_ops: List[Operation] = []
        self.lock = asyncio.Lock()
        self.client_states: Dict[str, int] = {}  # 客户端最后确认版本

    async def add_connection(self, websocket: WebSocket, client_id: str):
        async with self.lock:
            self.connections.add(websocket)
            self.client_states[client_id] = self.version
            # 发送初始状态
            await websocket.send_json({
                "type": "snapshot",
                "document": self.document,
                "version": self.version
            })

    async def remove_connection(self, websocket: WebSocket, client_id: str):
        async with self.lock:
            self.connections.discard(websocket)
            if client_id in self.client_states:
                del self.client_states[client_id]

    async def apply_operation(self, operation: Operation):
        """应用操作转换并更新文档"""
        async with self.lock:
            # 转换所有待处理操作
            transformed_op = operation
            for pending in self.pending_ops:
                transformed_op = OTEngine.transform(pending, transformed_op)

            # 应用转换后的操作
            if transformed_op.type == "insert":
                self.document = (self.document[:transformed_op.position] +
                                 transformed_op.content +
                                 self.document[transformed_op.position:])
            elif transformed_op.type == "delete":
                start = transformed_op.position
                end = min(start + transformed_op.length, len(self.document))
                self.document = self.document[:start] + self.document[end:]

            # 更新状态
            self.version += 1
            self.pending_ops.append(transformed_op)

            # 广播转换后的操作
            broadcast_tasks = []
            for conn in self.connections:
                try:
                    broadcast_tasks.append(conn.send_json({
                        "type": "operation",
                        "operation": transformed_op.dict(),
                        "document": self.document,
                        "version": self.version
                    }))
                except:
                    pass
            await asyncio.gather(*broadcast_tasks, return_exceptions=True)

            # 清除已处理的操作
            min_client_version = min(self.client_states.values(), default=self.version)
            self.pending_ops = [op for op in self.pending_ops if op.version >= min_client_version]


# 全局房间管理
room_manager: Dict[str, CollaborationRoom] = {}  # 实际使用redis实现, 这里使用字典模拟
global_lock = asyncio.Lock()


@app.websocket("/ws/{room_id}/{client_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str, client_id: str):
    await websocket.accept()

    # 获取或创建房间
    async with global_lock:
        if room_id not in room_manager:
            room_manager[room_id] = CollaborationRoom(room_id)
        room = room_manager[room_id]

    # 加入房间
    await room.add_connection(websocket, client_id)
    logger.info(f"Client {client_id} joined room {room_id}")

    try:
        while True:
            data = await websocket.receive_json()
            op = Operation(**data)
            op.client_id = client_id

            # 应用操作转换
            await room.apply_operation(op)

    except Exception as e:
        logger.error(f"Error in room {room_id}: {str(e)}")
    finally:
        # 离开房间
        await room.remove_connection(websocket, client_id)
        logger.info(f"Client {client_id} left room {room_id}")

        # 清理空房间
        async with global_lock:
            if not room.connections:
                del room_manager[room_id]
                logger.info(f"Room {room_id} closed")

关键机制

  1. 使用WebSocket协议替代HTTP长轮询
  2. 维护活动连接池active_connections
  3. 通过Pydantic模型验证操作格式
  4. 广播模式实现实时同步

优化技巧

  1. 使用requestAnimationFrame合并高频操作
  2. 添加操作版本号解决时序问题
  3. 实现本地缓存防止数据丢失
  4. 添加心跳机制检测连接状态

三、前端Vue.js连接实现

组件核心代码








sequenceDiagram participant ClientA participant Server participant ClientB ClientA->>ClientA: 用户输入操作 ClientA->>ClientA: 乐观更新UI ClientA->>Server: 发送操作(位置+内容) Server->>Server: 应用OT转换 Server->>ClientA: 广播确认操作 Server->>ClientB: 广播转换后操作 ClientB->>ClientB: 应用转换后操作

四、消息同步策略与冲突解决

  1. 状态同步机制

    sequenceDiagram participant ClientA participant Server participant ClientB ClientA->>Server: 发送操作OP_A(位置P) Server->>Server: 转换操作:OP_A' = OT(OP_A, 待处理操作) Server->>Server: 更新文档状态 Server->>ClientA: 广播转换后的OP_A' Server->>ClientB: 广播转换后的OP_A' ClientB->>ClientB: 应用OP_A'更新本地文档 ClientB->>Server: 发送新操作OP_B

  2. 冲突解决流程

    • 客户端发送操作时携带当前位置和版本
    • 服务端对并发操作进行转换排序
    • 转换后操作广播给所有客户端
    • 客户端收到操作后无条件应用
  3. 消息协议设计

    // 快照消息(初始同步)
    {
      "type": "snapshot",
      "document": "当前文本",
      "version": 15
    }
    
    // 操作消息
    {
      "type": "operation",
      "operation": {
         "type": "insert",
         "position": 5,
         "content": "hello",
         "client_id": "user1",
         "version": 16
      },
      "document": "更新后文本",
      "version": 16
    }
    
    

同步策略对比

策略 优点 缺点
最后写入优先 实现简单 可能丢失用户操作
操作转换(OT) 精确解决冲突 算法复杂度高
CRDT 无需中心协调 内存消耗较大

五、压力测试与部署方案

压力测试命令

pip install websocket-client
python -m websockets ws://localhost:8000/ws/room1 -c 1000 -m "测试消息"

部署架构

客户端 → Nginx → FastAPI (Uvicorn) → Redis Cluster

Nginx配置要点

http {
    map $http_upgrade $connection_upgrade {
        default upgrade;
        '' close;
    }

    server {
        location /ws/ {
            proxy_pass http://backend;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection $connection_upgrade;
        }
    }
}

课后 Quiz

Q1:当两个用户同时在第5个字符位置插入不同内容时,如何保证最终一致性?

答案:采用操作转换算法,根据操作逻辑时间戳调整插入位置。例如用户A插入"X",用户B插入"Y",最终在第5位显示"YX"或"XY"
,取决于操作到达服务器的顺序。

Q2:WebSocket连接频繁断开可能是什么原因?

解决方案

  1. 检查防火墙设置是否允许WS协议
  2. 配置合理的心跳间隔(建议30秒)
  3. 增加Nginx的超时设置:
    proxy_read_timeout 3600s;
    proxy_send_timeout 3600s;
    

常见报错处理

错误1:403 Forbidden

  • 原因:跨域请求被阻止
  • 解决:添加CORS中间件
    from fastapi.middleware.cors import CORSMiddleware
    
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
    

错误2:1006 Connection Closed Abnormally

  • 原因:客户端未正确处理断开事件
  • 解决:添加重连机制
    function connect() {
      const ws = new WebSocket(url)
      ws.onclose = () => setTimeout(connect, 1000)
    }
    

通过本文的实践,开发者可以掌握实时协作系统的核心实现技术。建议在开发过程中使用wireshark
工具监控WebSocket流量,并配合Chrome DevTools的Performance面板进行前端性能优化。

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何用WebSocket打造毫秒级实时协作系统?

往期文章归档:

免费好用的热门在线工具

登录查看全部

参与评论

评论留言

还没有评论留言,赶紧来抢楼吧~~

手机查看

返回顶部

给这篇文章打个标签吧~

棒极了 糟糕透顶 好文章 PHP JAVA JS 小程序 Python SEO MySql 确认