乐闻世界logo
搜索文章和话题

MQTT 的保留消息(Retained Messages)是什么?如何使用?

2月21日 15:44

MQTT 的保留消息(Retained Messages)是一种特殊的消息机制,允许 Broker 保存最新消息,供新订阅者接收。

保留消息的概念

定义

保留消息是 Broker 持久化存储的消息,当有新的客户端订阅该主题时,Broker 会立即将保留消息发送给该客户端。

作用

  • 状态同步:新订阅者可以立即获取最新状态
  • 初始化数据:为新连接的客户端提供初始数据
  • 状态恢复:帮助客户端快速恢复到最新状态
  • 减少请求:避免客户端主动请求最新状态

保留消息的工作原理

设置保留消息

发布消息时设置 Retain 标志:

shell
PUBLISH 报文参数: - Topic: 主题名称 - Payload: 消息内容 - QoS: QoS 级别 - Retain: true(设置为保留消息)

保留消息的存储

  • 存储位置:Broker 内存或持久化存储
  • 存储数量:每个主题只保留一条最新消息
  • 存储覆盖:新发布的保留消息会覆盖之前的保留消息

保留消息的发送

当客户端订阅主题时:

  1. 客户端发送 SUBSCRIBE 报文
  2. Broker 检查该主题是否有保留消息
  3. 如果有,立即发送保留消息给客户端
  4. 然后发送后续的新消息

保留消息的特性

1. 每个主题一条

  • 规则:每个主题只保留一条最新的保留消息
  • 覆盖机制:新发布的保留消息会替换之前的
  • 清除机制:发布空消息(Payload 为空)可以清除保留消息

2. QoS 级别

  • 继承性:保留消息的 QoS 级别由发布时决定
  • 订阅限制:订阅者接收的 QoS 级别受限于订阅时的 QoS 设置
  • QoS 规则:实际 QoS = min(发布 QoS, 订阅 QoS)

3. 持久化

  • 内存存储:默认存储在内存中
  • 持久化存储:可配置持久化到磁盘
  • Broker 重启:持久化的保留消息在 Broker 重启后仍然存在

4. 消息顺序

  • 发送顺序:保留消息在普通消息之前发送
  • 订阅时机:只在订阅时发送一次
  • 后续消息:不重复发送保留消息

使用场景

1. 设备状态同步

shell
场景:温度传感器 保留主题:sensor/123/temperature 保留消息:{"value": 25.5, "unit": "C", "timestamp": 1234567890} 新订阅者订阅 sensor/123/temperature 立即收到最新温度值

2. 配置信息发布

shell
场景:设备配置 保留主题:config/device/123 保留消息:{"mode": "auto", "interval": 60} 新设备上线订阅配置主题 立即获取最新配置

3. 系统状态广播

shell
场景:系统状态 保留主题:system/status 保留消息:{"status": "running", "version": "1.0.0"} 新客户端订阅系统状态 立即获取当前系统状态

4. 开关状态

shell
场景:智能开关 保留主题:switch/123/state 保留消息:{"state": "on"} 新订阅者立即获取开关状态

代码示例

Python (paho-mqtt)

python
import paho.mqtt.client as mqtt import json import time def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") # 订阅主题 client.subscribe("sensor/+/temperature") def on_message(client, userdata, msg): print(f"Received: {msg.topic} - {msg.payload.decode()}") print(f"Retained: {msg.retain}") # 发布保留消息 client = mqtt.Client() client.connect("broker.example.com", 1883, 60) # 发布保留消息(retain=True) message = {"value": 25.5, "unit": "C", "timestamp": int(time.time())} client.publish("sensor/123/temperature", json.dumps(message), retain=True) # 清除保留消息(发布空消息) # client.publish("sensor/123/temperature", "", retain=True) client.disconnect() # 订阅者 subscriber = mqtt.Client() subscriber.on_connect = on_connect subscriber.on_message = on_message subscriber.connect("broker.example.com", 1883, 60) subscriber.loop_forever()

JavaScript (MQTT.js)

javascript
const mqtt = require('mqtt'); // 发布保留消息 const publisher = mqtt.connect('mqtt://broker.example.com'); publisher.on('connect', () => { console.log('Publisher connected'); // 发布保留消息(retain: true) const message = JSON.stringify({ value: 25.5, unit: 'C', timestamp: Date.now() }); publisher.publish('sensor/123/temperature', message, { retain: true }); // 清除保留消息(发布空消息) // publisher.publish('sensor/123/temperature', '', { retain: true }); publisher.end(); }); // 订阅者 const subscriber = mqtt.connect('mqtt://broker.example.com'); subscriber.on('connect', () => { console.log('Subscriber connected'); subscriber.subscribe('sensor/+/temperature'); }); subscriber.on('message', (topic, message) => { console.log(`Received: ${topic} - ${message.toString()}`); console.log(`Retained: ${message.retain}`); });

最佳实践

1. 保留消息设计

  • 状态信息:保留消息应该表示当前状态
  • 简洁明了:消息内容简洁,易于解析
  • 包含时间戳:便于判断消息的新旧程度
  • 版本控制:可以包含版本信息

2. 主题命名

shell
推荐格式: - sensor/{device_id}/temperature - config/{device_id} - status/{system_id} 避免使用: - 通配符主题(不能发布到通配符主题) - 过于复杂的主题结构

3. 消息大小

  • 限制大小:保留消息不宜过大
  • 建议大小:通常小于 1KB
  • Broker 限制:注意 Broker 对消息大小的限制

4. QoS 选择

  • 一般状态:QoS 0
  • 重要状态:QoS 1
  • 关键状态:QoS 2

5. 清除机制

  • 主动清除:发布空消息清除保留消息
  • 定期清理:定期检查和清理过期的保留消息
  • 生命周期管理:为保留消息设置合理的生命周期

注意事项

  1. 内存占用:保留消息会占用 Broker 内存,大量保留消息可能影响性能

  2. 持久化配置:如果需要保留消息在 Broker 重启后仍然存在,需要配置持久化

  3. 消息更新:频繁更新保留消息会增加 Broker 负担

  4. 订阅时机:保留消息只在订阅时发送,不会重复发送

  5. QoS 限制:订阅者接收的 QoS 级别受限于订阅时的 QoS 设置

  6. 空消息清除:发布空消息(Payload 为空)可以清除保留消息

保留消息 vs 遗嘱消息

特性保留消息遗嘱消息
触发时机订阅时异常断开时
消息来源发布者设置客户端设置
存储位置BrokerBroker
发送对象新订阅者订阅该主题的客户端
消息数量每主题一条每客户端一条
清除方式发布空消息正常断开或重新连接

保留消息的局限性

  1. 每主题一条:每个主题只能保留一条消息,无法保存历史消息
  2. 内存占用:大量保留消息会占用较多内存
  3. 实时性:保留消息可能不是最新的(取决于发布频率)
  4. 无历史记录:无法获取历史状态变化
  5. 依赖 Broker:完全依赖 Broker 的可靠性

MQTT 保留消息是物联网应用中非常重要的机制,合理使用可以有效实现状态同步和初始化,提高用户体验和系统可靠性。

标签:MQTT