MQTT 的保留消息(Retained Messages)是一种特殊的消息机制,允许 Broker 保存最新消息,供新订阅者接收。
保留消息的概念
定义
保留消息是 Broker 持久化存储的消息,当有新的客户端订阅该主题时,Broker 会立即将保留消息发送给该客户端。
作用
- 状态同步:新订阅者可以立即获取最新状态
- 初始化数据:为新连接的客户端提供初始数据
- 状态恢复:帮助客户端快速恢复到最新状态
- 减少请求:避免客户端主动请求最新状态
保留消息的工作原理
设置保留消息
发布消息时设置 Retain 标志:
shellPUBLISH 报文参数: - Topic: 主题名称 - Payload: 消息内容 - QoS: QoS 级别 - Retain: true(设置为保留消息)
保留消息的存储
- 存储位置:Broker 内存或持久化存储
- 存储数量:每个主题只保留一条最新消息
- 存储覆盖:新发布的保留消息会覆盖之前的保留消息
保留消息的发送
当客户端订阅主题时:
- 客户端发送 SUBSCRIBE 报文
- Broker 检查该主题是否有保留消息
- 如果有,立即发送保留消息给客户端
- 然后发送后续的新消息
保留消息的特性
1. 每个主题一条
- 规则:每个主题只保留一条最新的保留消息
- 覆盖机制:新发布的保留消息会替换之前的
- 清除机制:发布空消息(Payload 为空)可以清除保留消息
2. QoS 级别
- 继承性:保留消息的 QoS 级别由发布时决定
- 订阅限制:订阅者接收的 QoS 级别受限于订阅时的 QoS 设置
- QoS 规则:实际 QoS = min(发布 QoS, 订阅 QoS)
3. 持久化
- 内存存储:默认存储在内存中
- 持久化存储:可配置持久化到磁盘
- Broker 重启:持久化的保留消息在 Broker 重启后仍然存在
4. 消息顺序
- 发送顺序:保留消息在普通消息之前发送
- 订阅时机:只在订阅时发送一次
- 后续消息:不重复发送保留消息
使用场景
1. 设备状态同步
shell场景:温度传感器 保留主题:sensor/123/temperature 保留消息:{"value": 25.5, "unit": "C", "timestamp": 1234567890} 新订阅者订阅 sensor/123/temperature 立即收到最新温度值
2. 配置信息发布
shell场景:设备配置 保留主题:config/device/123 保留消息:{"mode": "auto", "interval": 60} 新设备上线订阅配置主题 立即获取最新配置
3. 系统状态广播
shell场景:系统状态 保留主题:system/status 保留消息:{"status": "running", "version": "1.0.0"} 新客户端订阅系统状态 立即获取当前系统状态
4. 开关状态
shell场景:智能开关 保留主题:switch/123/state 保留消息:{"state": "on"} 新订阅者立即获取开关状态
代码示例
Python (paho-mqtt)
pythonimport paho.mqtt.client as mqtt import json import time def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") # 订阅主题 client.subscribe("sensor/+/temperature") def on_message(client, userdata, msg): print(f"Received: {msg.topic} - {msg.payload.decode()}") print(f"Retained: {msg.retain}") # 发布保留消息 client = mqtt.Client() client.connect("broker.example.com", 1883, 60) # 发布保留消息(retain=True) message = {"value": 25.5, "unit": "C", "timestamp": int(time.time())} client.publish("sensor/123/temperature", json.dumps(message), retain=True) # 清除保留消息(发布空消息) # client.publish("sensor/123/temperature", "", retain=True) client.disconnect() # 订阅者 subscriber = mqtt.Client() subscriber.on_connect = on_connect subscriber.on_message = on_message subscriber.connect("broker.example.com", 1883, 60) subscriber.loop_forever()
JavaScript (MQTT.js)
javascriptconst mqtt = require('mqtt'); // 发布保留消息 const publisher = mqtt.connect('mqtt://broker.example.com'); publisher.on('connect', () => { console.log('Publisher connected'); // 发布保留消息(retain: true) const message = JSON.stringify({ value: 25.5, unit: 'C', timestamp: Date.now() }); publisher.publish('sensor/123/temperature', message, { retain: true }); // 清除保留消息(发布空消息) // publisher.publish('sensor/123/temperature', '', { retain: true }); publisher.end(); }); // 订阅者 const subscriber = mqtt.connect('mqtt://broker.example.com'); subscriber.on('connect', () => { console.log('Subscriber connected'); subscriber.subscribe('sensor/+/temperature'); }); subscriber.on('message', (topic, message) => { console.log(`Received: ${topic} - ${message.toString()}`); console.log(`Retained: ${message.retain}`); });
最佳实践
1. 保留消息设计
- 状态信息:保留消息应该表示当前状态
- 简洁明了:消息内容简洁,易于解析
- 包含时间戳:便于判断消息的新旧程度
- 版本控制:可以包含版本信息
2. 主题命名
shell推荐格式: - sensor/{device_id}/temperature - config/{device_id} - status/{system_id} 避免使用: - 通配符主题(不能发布到通配符主题) - 过于复杂的主题结构
3. 消息大小
- 限制大小:保留消息不宜过大
- 建议大小:通常小于 1KB
- Broker 限制:注意 Broker 对消息大小的限制
4. QoS 选择
- 一般状态:QoS 0
- 重要状态:QoS 1
- 关键状态:QoS 2
5. 清除机制
- 主动清除:发布空消息清除保留消息
- 定期清理:定期检查和清理过期的保留消息
- 生命周期管理:为保留消息设置合理的生命周期
注意事项
-
内存占用:保留消息会占用 Broker 内存,大量保留消息可能影响性能
-
持久化配置:如果需要保留消息在 Broker 重启后仍然存在,需要配置持久化
-
消息更新:频繁更新保留消息会增加 Broker 负担
-
订阅时机:保留消息只在订阅时发送,不会重复发送
-
QoS 限制:订阅者接收的 QoS 级别受限于订阅时的 QoS 设置
-
空消息清除:发布空消息(Payload 为空)可以清除保留消息
保留消息 vs 遗嘱消息
| 特性 | 保留消息 | 遗嘱消息 |
|---|---|---|
| 触发时机 | 订阅时 | 异常断开时 |
| 消息来源 | 发布者设置 | 客户端设置 |
| 存储位置 | Broker | Broker |
| 发送对象 | 新订阅者 | 订阅该主题的客户端 |
| 消息数量 | 每主题一条 | 每客户端一条 |
| 清除方式 | 发布空消息 | 正常断开或重新连接 |
保留消息的局限性
- 每主题一条:每个主题只能保留一条消息,无法保存历史消息
- 内存占用:大量保留消息会占用较多内存
- 实时性:保留消息可能不是最新的(取决于发布频率)
- 无历史记录:无法获取历史状态变化
- 依赖 Broker:完全依赖 Broker 的可靠性
MQTT 保留消息是物联网应用中非常重要的机制,合理使用可以有效实现状态同步和初始化,提高用户体验和系统可靠性。