扫描二维码
关注或者微信搜一搜:编程智域 前端至全栈交流与成长
发现1000+提升效率与开发的AI工具和实用程序:https://tools.cmdragon.cn/
MQTT协议基础与FastAPI集成原理
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息协议,采用"主题-消息"
机制实现设备间通信。其工作流程可通过以下时序图表示:
[设备A] --发布消息到/temperature主题--> [MQTT Broker]
[FastAPI服务] --订阅/temperature主题--> [MQTT Broker]
[Broker] --推送消息--> [FastAPI服务]
# 导入必要的库
from fastapi import FastAPI
from fastapi_mqtt import FastMQTT
from pydantic import BaseModel
import logging
# 配置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("MQTT_OTA")
# 初始化FastAPI应用
app = FastAPI(title="IoT设备OTA升级服务")
# 配置MQTT连接参数(示例使用公共测试服务器)
mqtt_config = {
"host": "test.mosquitto.org",
"port": 1883,
"keepalive": 60,
"client_id": "fastapi_ota_server"
}
# 初始化MQTT客户端
mqtt = FastMQTT(config=mqtt_config)
mqtt.init_app(app)
# 定义Pydantic数据模型
class FirmwareUpdate(BaseModel):
device_id: str
firmware_version: str
chunk_size: int = 512 # 默认分片大小512KB
checksum: str
# MQTT连接状态回调
@mqtt.on_connect()
def handle_connect(client, flags, rc, properties):
logger.info(f"Connected with result code {rc}")
mqtt.client.subscribe("/ota/+/request") # 订阅设备升级请求主题
# 设备升级请求处理
@app.post("/firmware/upgrade")
async def create_upgrade_task(update: FirmwareUpdate):
"""创建固件升级任务"""
# 此处添加数据库记录存储逻辑
return {"task_id": "OTA_20230801_001", "status": "queued"}
# MQTT消息处理
@mqtt.on_message()
async def message_handler(client, topic, payload, qos, properties):
"""处理设备端MQTT消息"""
device_id = topic.split("/")[2]
if "request" in topic:
handle_upgrade_request(device_id, payload)
elif "progress" in topic:
logger.info(f"设备{device_id}升级进度: {payload.decode()}")
elif "verify" in topic:
handle_verification(device_id, payload)
def handle_upgrade_request(device_id: str, payload: bytes):
"""处理升级请求"""
try:
request_data = json.loads(payload)
# 验证设备合法性
if validate_device(device_id):
# 推送升级元数据
metadata = {
"url": f"https://firmware.example.com/{request_data['version']}.bin",
"size": 2048000,
"checksum": "sha256:9f86d08..."
}
mqtt.publish(f"/ota/{device_id}/metadata", json.dumps(metadata))
except Exception as e:
logger.error(f"处理请求错误: {str(e)}")
# 运行参数配置
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
代码说明:
依赖库及版本:
功能实现要点:
on_connect
装饰器实现MQTT连接成功后的自动订阅/ota/+/request
监听所有设备的升级请求协议升级流程:
课后Quiz:
问题:当设备端收到多个升级任务时,如何保证升级顺序的正确性?
答案:B
解析:服务端应维护优先级队列,紧急安全更新优先于常规更新,同时需要实现任务状态锁避免并发冲突
问题:MQTT的QoS等级设置为2时,可能带来什么影响?
答案:B
解析:QoS 2通过四次握手保证精确一次传输,会增加通信开销但确保可靠性,适合关键操作
常见报错处理:
MQTT连接失败(Connection Refused)
现象:客户端持续收到Connection Refused错误
排查步骤:
telnet
命令测试端口连通性固件校验失败(Checksum Mismatch)
解决方案:
def verify_firmware(data: bytes, checksum: str) -> bool:
import hashlib
sha256 = hashlib.sha256()
sha256.update(data)
return sha256.hexdigest() == checksum.split(":")[1]
设备响应超时(Timeout Error)
优化建议:
mqtt.client.reconnect_delay_set(min_delay=1, max_delay=120)
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:IoT设备的OTA升级是如何通过MQTT协议实现无缝对接的?
参与评论
手机查看
返回顶部