乐闻世界logo
搜索文章和话题

面试题手册

MobX 中的中间件和拦截器如何使用?

MobX 的中间件和拦截器提供了强大的功能,可以在状态变化前后执行自定义逻辑。以下是 MobX 中间件和拦截器的详细使用方法:1. 拦截器(Intercept)基本用法拦截器允许在状态变化之前拦截和修改操作。import { observable, intercept } from 'mobx';const store = observable({ count: 0, items: []});// 拦截 count 的变化const dispose = intercept(store, 'count', (change) => { console.log('Before change:', change); // 可以修改变化 if (change.newValue < 0) { change.newValue = 0; // 不允许负数 } // 可以取消变化 if (change.newValue > 100) { return null; // 取消变化 } return change; // 允许变化});store.count = 5; // Before change: { type: 'update', object: store, name: 'count', newValue: 5 }console.log(store.count); // 5store.count = -1; // Before change: { type: 'update', object: store, name: 'count', newValue: -1 }console.log(store.count); // 0 (被拦截器修改)store.count = 101; // Before change: { type: 'update', object: store, name: 'count', newValue: 101 }console.log(store.count); // 0 (被拦截器取消)dispose(); // 清理拦截器拦截数组操作const items = observable([1, 2, 3]);intercept(items, (change) => { console.log('Array change:', change); // 拦截 push 操作 if (change.type === 'add') { if (typeof change.newValue !== 'number') { throw new Error('Only numbers allowed'); } } return change;});items.push(4); // Array change: { type: 'add', object: items, name: '3', newValue: 4 }items.push('invalid'); // Error: Only numbers allowed拦截 Map 操作const map = observable(new Map());intercept(map, (change) => { console.log('Map change:', change); // 拦截 set 操作 if (change.type === 'update' || change.type === 'add') { if (change.name === 'secret') { throw new Error('Cannot set secret key'); } } return change;});map.set('key1', 'value1'); // Map change: { type: 'add', object: map, name: 'key1', newValue: 'value1' }map.set('secret', 'value'); // Error: Cannot set secret key2. 观察器(Observe)基本用法观察器允许在状态变化后执行自定义逻辑。import { observable, observe } from 'mobx';const store = observable({ count: 0, items: []});// 观察 count 的变化const dispose = observe(store, 'count', (change) => { console.log('After change:', change); console.log('Old value:', change.oldValue); console.log('New value:', change.newValue);});store.count = 5;// After change: { type: 'update', object: store, name: 'count', oldValue: 0, newValue: 5 }dispose(); // 清理观察器观察数组变化const items = observable([1, 2, 3]);observe(items, (change) => { console.log('Array changed:', change); if (change.type === 'splice') { console.log('Added:', change.added); console.log('Removed:', change.removed); console.log('Index:', change.index); }});items.push(4);// Array changed: { type: 'splice', object: items, added: [4], removed: [], index: 3 }items.splice(1, 1);// Array changed: { type: 'splice', object: items, added: [], removed: [2], index: 1 }观察对象的所有变化const store = observable({ count: 0, name: 'John', items: []});// 观察所有属性的变化const dispose = observe(store, (change) => { console.log(`${change.name} changed from ${change.oldValue} to ${change.newValue}`);});store.count = 1; // count changed from 0 to 1store.name = 'Jane'; // name changed from John to Janedispose();3. 中间件(Middleware)创建自定义中间件import { observable, action, runInAction } from 'mobx';function loggingMiddleware(store, methodName, actionFn) { return function(...args) { console.log(`[Action] ${methodName} called with:`, args); const startTime = performance.now(); const result = actionFn.apply(this, args); const endTime = performance.now(); console.log(`[Action] ${methodName} completed in ${endTime - startTime}ms`); return result; };}class Store { @observable count = 0; constructor() { makeAutoObservable(this); } @action increment = () => { this.count++; }; @action decrement = () => { this.count--; };}// 应用中间件const store = new Store();const originalIncrement = store.increment.bind(store);store.increment = loggingMiddleware(store, 'increment', originalIncrement);使用 action 钩子import { action, configure } from 'mobx';configure({ // 启用 action 钩子 enforceActions: 'always'});// 全局 action 钩子const originalAction = action.bound;action.bound = function(target, propertyKey, descriptor) { console.log(`[Action] ${propertyKey} is being defined`); return originalAction(target, propertyKey, descriptor);};错误处理中间件function errorHandlingMiddleware(store, methodName, actionFn) { return async function(...args) { try { return await actionFn.apply(this, args); } catch (error) { console.error(`[Error] ${methodName} failed:`, error); // 可以将错误存储到 store 中 if (store.errorStore) { store.errorStore.addError(error); } throw error; } };}class Store { @observable data = null; @observable error = null; constructor() { makeAutoObservable(this); } @action fetchData = async () => { this.data = await fetch('/api/data').then(r => r.json()); };}// 应用错误处理中间件const store = new Store();store.fetchData = errorHandlingMiddleware(store, 'fetchData', store.fetchData);4. 使用拦截器和观察器实现撤销/重做class HistoryStore { @observable past = []; @observable future = []; constructor(targetStore) { this.targetStore = targetStore; makeAutoObservable(this); this.setupInterceptors(); } setupInterceptors() { // 拦截所有状态变化 Object.keys(this.targetStore).forEach(key => { if (this.targetStore[key] && typeof this.targetStore[key] === 'object') { intercept(this.targetStore, key, (change) => { // 保存当前状态到 past this.past.push({ key, oldValue: change.oldValue, timestamp: Date.now() }); // 清空 future this.future = []; return change; }); } }); } @action undo = () => { if (this.past.length === 0) return; const lastChange = this.past.pop(); this.future.push(lastChange); // 恢复旧值 this.targetStore[lastChange.key] = lastChange.oldValue; }; @action redo = () => { if (this.future.length === 0) return; const nextChange = this.future.pop(); this.past.push(nextChange); // 恢复新值 this.targetStore[nextChange.key] = nextChange.newValue; }; @computed get canUndo() { return this.past.length > 0; } @computed get canRedo() { return this.future.length > 0; }}5. 性能监控中间件function performanceMonitoringMiddleware(store, methodName, actionFn) { return function(...args) { const startTime = performance.now(); const result = actionFn.apply(this, args); const endTime = performance.now(); const duration = endTime - startTime; // 记录性能数据 if (!store.performanceMetrics) { store.performanceMetrics = {}; } if (!store.performanceMetrics[methodName]) { store.performanceMetrics[methodName] = { count: 0, totalTime: 0, maxTime: 0, minTime: Infinity }; } const metrics = store.performanceMetrics[methodName]; metrics.count++; metrics.totalTime += duration; metrics.maxTime = Math.max(metrics.maxTime, duration); metrics.minTime = Math.min(metrics.minTime, duration); // 警告慢操作 if (duration > 100) { console.warn(`[Performance] ${methodName} took ${duration.toFixed(2)}ms`); } return result; };}6. 权限控制中间件function permissionMiddleware(store, methodName, actionFn, permissions) { return function(...args) { const user = store.userStore?.user; if (!user) { throw new Error('User not authenticated'); } if (permissions && !user.permissions.includes(permissions)) { throw new Error(`User does not have permission: ${permissions}`); } return actionFn.apply(this, args); };}class Store { @observable data = []; constructor() { makeAutoObservable(this); } @action addItem = (item) => { this.data.push(item); }; @action deleteItem = (id) => { this.data = this.data.filter(item => item.id !== id); };}// 应用权限中间件const store = new Store();store.addItem = permissionMiddleware(store, 'addItem', store.addItem, 'write');store.deleteItem = permissionMiddleware(store, 'deleteItem', store.deleteItem, 'delete');7. 日志记录中间件function loggingMiddleware(store, methodName, actionFn) { return function(...args) { const logEntry = { timestamp: new Date().toISOString(), action: methodName, args: JSON.parse(JSON.stringify(args)), result: null, error: null }; try { const result = actionFn.apply(this, args); logEntry.result = JSON.parse(JSON.stringify(result)); return result; } catch (error) { logEntry.error = { message: error.message, stack: error.stack }; throw error; } finally { // 将日志发送到服务器或存储到本地 if (store.logStore) { store.logStore.addLog(logEntry); } } };}8. 防抖和节流中间件function debounceMiddleware(store, methodName, actionFn, delay = 300) { let timeoutId = null; return function(...args) { if (timeoutId) { clearTimeout(timeoutId); } timeoutId = setTimeout(() => { actionFn.apply(this, args); timeoutId = null; }, delay); };}function throttleMiddleware(store, methodName, actionFn, delay = 300) { let lastCallTime = 0; return function(...args) { const now = Date.now(); const timeSinceLastCall = now - lastCallTime; if (timeSinceLastCall >= delay) { actionFn.apply(this, args); lastCallTime = now; } };}class SearchStore { @observable query = ''; @observable results = []; constructor() { makeAutoObservable(this); } @action performSearch = async (query) => { this.results = await api.search(query); };}// 应用防抖中间件const searchStore = new SearchStore();searchStore.performSearch = debounceMiddleware( searchStore, 'performSearch', searchStore.performSearch, 300);总结MobX 的中间件和拦截器提供了强大的功能:拦截器:在状态变化前拦截和修改操作观察器:在状态变化后执行自定义逻辑中间件:包装 action 以添加额外功能常见应用:撤销/重做、性能监控、权限控制、日志记录、防抖节流正确使用这些功能,可以构建更强大、更灵活的 MobX 应用。
阅读 0·2月21日 15:49

MQTT 的主题通配符有哪些?如何使用?

MQTT 的主题通配符是一种强大的订阅机制,允许客户端订阅一类主题而不是单个主题,提高了订阅的灵活性。主题通配符类型MQTT 提供两种通配符:1. 单级通配符(+)符号:加号(+)作用:匹配主题中的单个层级位置:可以出现在主题的任何位置限制:不能匹配空层级2. 多级通配符(#)符号:井号(#)作用:匹配主题中的多个层级(包括零个)位置:必须出现在主题的最后限制:必须是主题过滤器的最后一个字符通配符使用示例单级通配符(+)示例订阅主题:home/+/temperature匹配的主题:- home/livingroom/temperature ✓- home/bedroom/temperature ✓- home/kitchen/temperature ✓不匹配的主题:- home/livingroom/kitchen/temperature ✗(层级过多)- home/temperature ✗(层级过少)- home/livingroom/humidity ✗(最后一层不匹配)订阅主题:sensor/+/data/+匹配的主题:- sensor/001/data/temperature ✓- sensor/002/data/humidity ✓- sensor/003/data/pressure ✓不匹配的主题:- sensor/001/data ✗(层级过少)- sensor/001/data/temperature/value ✗(层级过多)多级通配符(#)示例订阅主题:home/#匹配的主题:- home/ ✓- home/livingroom ✓- home/livingroom/temperature ✓- home/livingroom/temperature/value ✓- home/bedroom/humidity ✓不匹配的主题:- home ✗(必须以 / 结尾或包含 /)- office/livingroom ✗(第一层不匹配)订阅主题:sensor/+/#匹配的主题:- sensor/001/ ✓- sensor/001/data ✓- sensor/001/data/temperature ✓- sensor/002/data/humidity/value ✓不匹配的主题:- sensor ✗(层级过少)- office/001/data ✗(第一层不匹配)组合使用示例订阅主题:home/+/sensors/#匹配的主题:- home/livingroom/sensors/ ✓- home/livingroom/sensors/temperature ✓- home/livingroom/sensors/temperature/value ✓- home/bedroom/sensors/humidity ✓不匹配的主题:- home/sensors/ ✗(缺少中间层级)- home/livingroom/sensors ✗(# 必须在最后)通配符规则单级通配符(+)规则匹配单个层级:只能匹配一个非空层级可以多次使用:可以在主题过滤器中多次出现可以出现在任何位置:可以在主题的任何层级使用不能跨层级:不能匹配多个层级多级通配符(#)规则匹配多个层级:可以匹配零个或多个层级必须在最后:必须是主题过滤器的最后一个字符只能使用一次:每个主题过滤器中只能使用一次必须跟随 /:如果主题有多个层级,# 前面必须有 /通配符应用场景1. 设备分类监控场景:监控所有温度传感器订阅主题:sensors/+/temperature效果:接收所有设备的温度数据2. 区域监控场景:监控某个区域的所有数据订阅主题:building/floor1/#效果:接收一楼所有设备的数据3. 设备状态监控场景:监控所有设备的在线状态订阅主题:device/+/status效果:接收所有设备的状态更新4. 数据类型订阅场景:订阅所有告警消息订阅主题:alert/#效果:接收所有类型的告警5. 分层订阅场景:订阅特定类型的所有子主题订阅主题:system/metrics/+/#效果:接收所有系统指标的详细数据通配符限制和注意事项1. 发布限制不能发布到通配符主题:通配符只能用于订阅,不能用于发布主题必须明确:发布时必须指定完整的主题路径2. 订阅限制通配符不能用于主题层级内部:如 home/room+/temperature 是无效的# 必须在最后:home/#/temperature 是无效的+ 不能匹配空:home/+/temperature 不能匹配 home//temperature3. 性能考虑通配符订阅会增加 Broker 负担:Broker 需要进行主题匹配避免过度使用通配符:过多的通配符订阅可能影响性能合理设计主题结构:良好的主题设计可以减少通配符使用4. 安全考虑ACL 权限控制:通配符订阅需要相应的权限避免过度授权:通配符订阅可能暴露过多数据最小权限原则:只授予必要的通配符权限代码示例Python (paho-mqtt)import paho.mqtt.client as mqttdef on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") # 单级通配符订阅 client.subscribe("sensor/+/temperature") # 多级通配符订阅 client.subscribe("home/bedroom/#") # 组合通配符订阅 client.subscribe("system/+/metrics/#")def on_message(client, userdata, msg): print(f"Received: {msg.topic} - {msg.payload.decode()}")client = mqtt.Client()client.on_connect = on_connectclient.on_message = on_messageclient.connect("broker.example.com", 1883, 60)client.loop_forever()JavaScript (MQTT.js)const mqtt = require('mqtt');const client = mqtt.connect('mqtt://broker.example.com');client.on('connect', () => { console.log('Connected'); // 单级通配符订阅 client.subscribe('sensor/+/temperature'); // 多级通配符订阅 client.subscribe('home/bedroom/#'); // 组合通配符订阅 client.subscribe('system/+/metrics/#');});client.on('message', (topic, message) => { console.log(`Received: ${topic} - ${message.toString()}`);});最佳实践1. 主题设计层级清晰:主题层级应该清晰、有意义避免过深:主题层级不宜过深(建议不超过 5 层)使用分隔符:统一使用 / 作为分隔符命名规范:使用一致的命名规范2. 通配符使用按需使用:只在需要时使用通配符避免过度通配:避免使用过于宽泛的通配符(如 #)组合使用:合理组合单级和多级通配符性能优化:在高性能场景下减少通配符使用3. 订阅管理及时取消订阅:不再需要的订阅应该及时取消避免重复订阅:避免重复订阅相同的主题监控订阅数量:监控订阅数量,避免过多订阅4. 安全管理权限控制:为通配符订阅设置适当的权限最小权限:只授予必要的最小权限定期审查:定期审查通配符订阅权限通配符性能优化1. Broker 优化选择合适的 Broker:选择支持高效通配符匹配的 Broker配置优化:根据实际需求调整 Broker 配置集群部署:大规模场景下使用集群部署2. 订阅优化精确订阅优先:优先使用精确主题订阅减少通配符层级:减少通配符匹配的层级批量订阅:使用批量订阅减少连接开销3. 主题优化主题前缀:使用主题前缀减少匹配范围主题分组:合理分组主题,减少通配符使用避免通配符嵌套:避免复杂的通配符嵌套MQTT 主题通配符是提高订阅灵活性的重要机制,合理使用可以简化应用逻辑,提高开发效率。但需要注意性能和安全问题,避免过度使用通配符。
阅读 0·2月21日 15:45

MQTT 和 HTTP 协议有什么区别?分别在什么场景下使用?

MQTT 与 HTTP 是两种常用的网络协议,它们在设计理念、应用场景和技术特点上有显著差异。设计理念对比MQTT设计目标:轻量级、低带宽、低功耗的消息传输协议通信模式:发布/订阅模式,一对多通信连接方式:长连接,保持持久连接传输方向:双向通信,服务器可以主动推送消息协议栈:应用层协议,基于 TCPHTTP设计目标:请求/响应模式的数据传输协议通信模式:客户端-服务器模式,一对一通信连接方式:短连接(HTTP/1.0)或长连接(HTTP/1.1 Keep-Alive)传输方向:单向通信,客户端主动请求协议栈:应用层协议,基于 TCP技术特点对比1. 消息传输| 特性 | MQTT | HTTP ||-----|------|------|| 传输模式 | 发布/订阅 | 请求/响应 || 消息方向 | 双向 | 单向(客户端→服务器) || 实时性 | 高 | 低(需要轮询或 WebSocket) || 消息大小 | 小(头部最小 2 字节) | 大(头部通常几百字节) || 带宽消耗 | 低 | 高 |2. 连接管理| 特性 | MQTT | HTTP ||-----|------|------|| 连接类型 | 长连接 | 短连接/长连接 || 连接保持 | Keep Alive 机制 | Keep-Alive(HTTP/1.1+) || 断线重连 | 自动重连 | 需要应用层处理 || 心跳机制 | 内置 PING/PONG | 无(需应用层实现) |3. 服务质量(QoS)| 特性 | MQTT | HTTP ||-----|------|------|| QoS 级别 | 3 级(0/1/2) | 无(依赖 TCP) || 消息确认 | 支持(PUBACK/PUBREC/PUBCOMP) | 无(依赖 TCP ACK) || 消息重传 | 支持 | 无(依赖 TCP 重传) || 消息顺序 | 保证 | 保证(TCP) |4. 安全性| 特性 | MQTT | HTTP ||-----|------|------|| 加密支持 | TLS/SSL(端口 8883) | HTTPS(端口 443) || 认证方式 | 用户名/密码、证书、Token | Basic Auth、Digest、OAuth || 访问控制 | ACL(主题级别) | 基于路径、权限系统 || 数据完整性 | 保证 | 保证 |性能对比1. 资源消耗| 指标 | MQTT | HTTP ||-----|------|------|| CPU 占用 | 低 | 中等 || 内存占用 | 低 | 中等 || 网络带宽 | 低 | 高 || 电池消耗 | 低 | 高 || 数据包大小 | 小 | 大 |2. 并发能力| 指标 | MQTT | HTTP ||-----|------|------|| 单连接消息数 | 高 | 低 || 并发连接数 | 高(百万级) | 中等(万级) || 消息吞吐量 | 高 | 中等 || 延迟 | 低(毫秒级) | 中等(百毫秒级) |应用场景对比MQTT 适用场景物联网设备传感器数据采集智能家居控制工业自动化车联网实时通信即时消息实时监控在线游戏聊天应用推送通知移动应用推送消息通知警报系统HTTP 适用场景Web 应用网页浏览API 调用文件下载表单提交数据传输RESTful API文件上传/下载大数据传输媒体流企业应用企业系统集成微服务通信数据同步业务流程代码示例对比MQTT 消息发布import paho.mqtt.client as mqttclient = mqtt.Client()client.connect("broker.example.com", 1883)client.publish("sensor/temperature", "25.5")client.disconnect()HTTP 请求import requestsresponse = requests.post( "https://api.example.com/sensor/temperature", json={"value": 25.5})print(response.status_code)优缺点总结MQTT 优点轻量级,适合资源受限设备低带宽,低功耗实时性好,支持双向通信支持一对多消息分发内置 QoS 保证适合物联网场景MQTT 缺点不适合大数据传输不适合文件传输不适合复杂查询生态系统相对较小HTTP 优点通用性强,生态丰富支持大数据传输支持复杂查询标准化程度高易于调试和监控支持缓存HTTP 缺点头部开销大实时性差(需要轮询)不适合低带宽环境服务器不能主动推送资源消耗较高选择建议选择 MQTT 的情况需要实时双向通信设备资源受限(低带宽、低功耗)需要一对多消息分发物联网应用需要离线消息支持网络不稳定环境选择 HTTP 的情况需要传输大数据需要复杂查询和过滤Web 应用开发RESTful API 设计需要广泛的工具支持需要缓存机制混合使用在实际应用中,可以结合使用 MQTT 和 HTTP:MQTT:用于实时数据传输、设备控制、状态更新HTTP:用于配置管理、数据查询、文件传输、API 访问例如:传感器数据通过 MQTT 实时上报历史数据查询通过 HTTP API设备配置通过 HTTP RESTful API告警通知通过 MQTT 实时推送MQTT 和 HTTP 各有优势,根据具体应用场景选择合适的协议,或者结合使用以发挥各自优势。
阅读 0·2月21日 15:45

MQTT Broker 的主要功能有哪些?常用的 MQTT Broker 实现有哪些?

MQTT Broker 是 MQTT 协议的核心组件,负责消息的接收、路由和分发。以下是 MQTT Broker 的主要功能和常用实现。Broker 的核心功能1. 连接管理客户端连接:接受和处理来自客户端的连接请求认证授权:验证客户端身份,控制访问权限会话管理:维护客户端会话状态心跳检测:通过 Keep Alive 机制检测客户端存活状态2. 消息路由主题匹配:根据订阅关系匹配消息主题消息分发:将消息转发给订阅该主题的所有客户端QoS 处理:确保消息按照指定的 QoS 级别传递消息过滤:支持基于主题和内容的消息过滤3. 持久化存储离线消息:存储离线客户端的订阅消息消息队列:临时存储待分发的消息会话状态:保存客户端的订阅关系和未确认消息消息日志:记录消息传输历史4. 安全机制TLS/SSL 加密:保护数据传输安全用户认证:支持用户名/密码、证书等多种认证方式访问控制:基于主题和客户端的权限管理ACL(访问控制列表):细粒度的权限控制5. 性能优化消息压缩:减少网络传输开销批量处理:提高消息处理效率负载均衡:支持集群部署,分散请求压力连接池:复用网络连接,降低资源消耗常用 MQTT Broker 实现1. Mosquitto特点:轻量级、开源、易于部署语言:C 语言实现适用场景:嵌入式设备、小型项目优点:资源占用少配置简单社区活跃缺点:性能相对较低企业级功能有限2. EMQX特点:高性能、分布式、企业级语言:Erlang/OTP 实现适用场景:大规模物联网平台、企业应用优点:支持百万级并发连接内置规则引擎丰富的管理界面支持集群和负载均衡缺点:学习曲线较陡资源占用较高3. HiveMQ特点:商业级、高性能、可扩展语言:Java 实现适用场景:企业级应用、金融、医疗优点:高性能和稳定性企业级支持和服务丰富的插件生态缺点:商业版本收费资源占用较高4. VerneMQ特点:高性能、分布式、可扩展语言:Erlang 实现适用场景:大规模实时通信优点:高并发支持灵活的插件系统支持集群部署缺点:文档相对较少社区规模较小5. RabbitMQ(MQTT 插件)特点:多功能消息代理语言:Erlang 实现适用场景:需要多种协议支持的系统优点:支持多种协议(AMQP、MQTT、STOMP)成熟稳定丰富的管理工具缺点:MQTT 功能相对基础性能不如专用 BrokerBroker 选择建议小型项目/原型开发推荐:Mosquitto理由:轻量、简单、免费中型项目/企业应用推荐:EMQX 社区版理由:功能丰富、性能良好、免费大规模物联网平台推荐:EMQX 企业版或 HiveMQ理由:高性能、企业级支持、可扩展需要多协议支持推荐:RabbitMQ理由:协议支持全面、成熟稳定性能指标对比| Broker | 并发连接数 | 消息吞吐量 | 资源占用 | 部署复杂度 ||--------|-----------|-----------|---------|-----------|| Mosquitto | 1万+ | 10万+ | 低 | 简单 || EMQX | 100万+ | 100万+ | 中 | 中等 || HiveMQ | 100万+ | 100万+ | 高 | 中等 || VerneMQ | 100万+ | 100万+ | 中 | 中等 || RabbitMQ | 10万+ | 10万+ | 中 | 中等 |选择 MQTT Broker 时,需要综合考虑项目规模、性能要求、预算和技术团队能力等因素。
阅读 0·2月21日 15:45

MQTT 5.0 相比 MQTT 3.1.1 有哪些新特性?

MQTT 5.0 版本在 3.1.1 版本的基础上进行了重大改进,引入了许多新特性,显著提升了协议的功能性和灵活性。MQTT 5.0 主要新特性1. 属性(Properties)定义:在控制报文中携带键值对形式的元数据作用:扩展协议功能,无需修改协议格式应用场景:消息过期时间请求/响应模式订阅标识符内容类型用户属性(自定义元数据)2. 请求/响应模式(Request/Response Pattern)相关属性:Response Topic:指定响应消息的主题Correlation Data:关联请求和响应工作流程:客户端发送请求消息,包含 Response Topic 和 Correlation Data服务端处理请求服务端发送响应消息到 Response Topic,包含相同的 Correlation Data客户端根据 Correlation Data 匹配响应优势:简化应用层实现减少自定义协议开发提高互操作性3. 会话和消息过期会话过期:Session Expiry Interval:指定会话过期时间(秒)0 表示立即过期,4294967295 表示永不过期替代了 Clean Session 标志消息过期:Message Expiry Interval:指定消息过期时间(秒)Broker 不再分发过期消息减少无效消息传输优势:更灵活的会话管理自动清理过期资源减少存储压力4. 共享订阅(Shared Subscriptions)语法:$share/<group>/<topic>示例:$share/consumer1/sensor/data工作原理:多个订阅者组成一个共享组每条消息只分发给组中的一个订阅者实现负载均衡优势:提高消息处理能力实现消费者扩展避免消息重复处理应用场景:高吞吐量数据处理分布式任务处理微服务架构5. 订阅标识符(Subscription Identifier)定义:为订阅分配一个数字标识符特点:每个客户端可以有多个订阅标识符标识符范围:1-268435455在 PUBLISH 报文中返回匹配的订阅标识符应用场景:区分不同的订阅实现复杂的消息路由简化应用逻辑6. 主题别名(Topic Alias)定义:用数字代替完整的主题字符串机制:客户端和 Broker 独立维护别名映射别名范围:1-65535在 CONNECT 或 PUBLISH 中声明优势:减少网络传输量降低带宽消耗提高传输效率应用场景:长主题名称高频消息传输带宽受限环境7. 流量控制(Flow Control)接收最大值(Receive Maximum):客户端指定未确认 PUBLISH 报文的最大数量防止消息积压默认值:65535最大数据包大小(Maximum Packet Size):限制最大数据包大小防止大包攻击默认值:无限制优势:防止资源耗尽提高系统稳定性适应不同网络条件8. 原因码(Reason Codes)定义:更详细的错误和状态信息范围:0x00-0xFF分类:成功码(0x00-0x00)错误码(0x80-0xFF)优势:更精确的错误诊断更好的问题排查改进的互操作性9. 认证增强(Enhanced Authentication)认证方法(Authentication Method):指定认证方法(如 SCRAM)支持多种认证协议认证数据(Authentication Data):携带认证相关的数据支持多轮认证重新认证(Re-authentication):在连接期间重新认证无需断开连接优势:更灵活的认证机制支持现代认证协议提高安全性10. 服务器断开(Server Disconnect)功能:服务器主动断开客户端连接原因码:说明断开原因服务器引用:提供服务器信息应用场景:服务器维护强制下线负载均衡MQTT 3.1.1 vs MQTT 5.0 对比| 特性 | MQTT 3.1.1 | MQTT 5.0 ||-----|-----------|----------|| 属性支持 | 无 | 支持 || 请求/响应 | 自定义实现 | 原生支持 || 会话管理 | Clean Session | Session Expiry || 共享订阅 | Broker 扩展 | 标准特性 || 主题别名 | 无 | 支持 || 流量控制 | 无 | 支持 || 错误码 | 简单 | 详细 || 认证机制 | 基础 | 增强 || 消息过期 | 无 | 支持 || 服务器断开 | 无 | 支持 |迁移建议向后兼容性MQTT 5.0 客户端可以连接到 MQTT 3.1.1 BrokerMQTT 3.1.1 客户端可以连接到 MQTT 5.0 Broker新特性仅在双方都支持时生效迁移策略评估需求:确定是否需要新特性逐步迁移:先升级 Broker,再升级客户端测试验证:充分测试兼容性和功能监控观察:监控迁移后的系统表现应用场景适合使用 MQTT 5.0 的场景需要请求/响应模式的应用高并发、高吞吐量的物联网平台需要精确错误诊断的系统需要灵活认证机制的企业应用带宽受限的物联网设备可以继续使用 MQTT 3.1.1 的场景简单的传感器数据采集低频率的消息传输已有稳定运行的系统资源极度受限的设备MQTT 5.0 的引入显著提升了协议的功能性和灵活性,为更复杂的物联网应用提供了更好的支持。
阅读 0·2月21日 15:45

MobX 的依赖追踪系统是如何工作的?

MobX 的依赖追踪系统是其核心机制,它通过细粒度的追踪实现了高效的响应式更新。以下是 MobX 依赖追踪的详细工作原理:依赖追踪的基本原理MobX 使用观察者模式和依赖图来实现依赖追踪。当 observable 被访问时,MobX 会建立依赖关系;当 observable 被修改时,MobX 会通知所有依赖它的观察者。核心组件1. Reaction(反应)Reaction 是依赖追踪的执行单元,包括:autorun:立即执行,并在依赖变化时自动重新执行reaction:提供更细粒度的控制,可以指定追踪函数和效果函数observer(React 组件):包装 React 组件,使其能够响应状态变化computed:计算属性,也是一种特殊的 reaction2. Derivation(派生)Derivation 表示依赖于 observable 的计算或副作用。每个 derivation 维护一个依赖列表。3. Atom(原子)Atom 是最小的可观察单元,每个 observable 对象、数组、Map 等都由多个 atom 组成。依赖追踪的执行流程1. 追踪阶段(Tracing)当 reaction 执行时:autorun(() => { console.log(store.count); // 访问 observable});执行步骤:MobX 将当前 reaction 标记为"正在追踪"当访问 store.count 时,MobX 记录下这个 reaction 依赖于 count 这个 atom继续执行,记录所有访问的 observable执行完成后,reaction 进入"稳定"状态2. 通知阶段(Notification)当 observable 被修改时:runInAction(() => { store.count++; // 修改 observable});执行步骤:MobX 检测到 count atom 被修改查找所有依赖于 count 的 reaction将这些 reaction 标记为"过时"(stale)在下一个事件循环中,重新执行这些 reaction依赖图的结构MobX 维护一个双向的依赖图:Atom → Derivation:每个 atom 知道哪些 derivation 依赖于它Derivation → Atom:每个 derivation 知道自己依赖于哪些 atom这种双向关系使得 MobX 能够高效地进行依赖更新和清理。细粒度更新MobX 的依赖追踪是细粒度的,这意味着:只更新真正需要更新的部分避免不必要的重新计算和重新渲染自动处理嵌套的依赖关系示例:class Store { @observable firstName = 'John'; @observable lastName = 'Doe'; @observable age = 30; @computed get fullName() { return `${this.firstName} ${this.lastName}`; }}const observerComponent = observer(() => { // 只依赖 fullName,不依赖 age return <div>{store.fullName}</div>;});当 age 变化时,组件不会重新渲染;只有当 firstName 或 lastName 变化时才会重新渲染。批量更新MobX 会自动批量更新,避免多次触发 reaction:runInAction(() => { store.firstName = 'Jane'; store.lastName = 'Smith'; store.age = 25;});即使修改了多个 observable,相关的 reaction 只会执行一次。依赖清理当 reaction 不再需要时,MobX 会自动清理依赖关系:组件卸载时,observer 会自动清理使用 dispose() 方法手动清理 reaction避免内存泄漏性能优化MobX 的依赖追踪系统提供了多种性能优化:懒计算:computed 只在需要时才计算缓存机制:computed 的结果会被缓存批量更新:多个状态变化合并为一次更新细粒度追踪:只追踪真正需要的依赖调试依赖追踪MobX 提供了调试工具来查看依赖关系:import { trace } from 'mobx';// 追踪 computed 的依赖trace(store.fullName);// 追踪 reaction 的依赖autorun(() => { console.log(store.count);}, { name: 'myReaction' });常见问题1. 循环依赖MobX 能够检测和避免循环依赖,但设计时应尽量避免。2. 过度追踪避免在循环或条件中访问 observable,这可能导致不必要的依赖。3. 内存泄漏确保在组件卸载时清理 reaction,避免内存泄漏。总结MobX 的依赖追踪系统通过观察者模式和依赖图实现了高效的响应式更新。理解这个系统的工作原理有助于编写更高效的 MobX 代码,并避免常见的性能问题。
阅读 0·2月21日 15:45

如何测试 MobX 应用?

MobX 的测试策略和工具对于构建可靠的应用至关重要。以下是 MobX 测试的完整指南:1. 测试 Store基本测试import { UserStore } from './UserStore';describe('UserStore', () => { let store; beforeEach(() => { store = new UserStore(); }); it('should initialize with default values', () => { expect(store.user).toBeNull(); expect(store.isAuthenticated).toBe(false); }); it('should login user', async () => { await store.login({ username: 'test', password: 'test' }); expect(store.user).not.toBeNull(); expect(store.isAuthenticated).toBe(true); }); it('should logout user', () => { store.user = { id: 1, name: 'Test' }; store.isAuthenticated = true; store.logout(); expect(store.user).toBeNull(); expect(store.isAuthenticated).toBe(false); });});测试 computed 属性describe('ProductStore', () => { let store; beforeEach(() => { store = new ProductStore(); }); it('should compute featured products', () => { store.products = [ { id: 1, name: 'Product 1', featured: true }, { id: 2, name: 'Product 2', featured: false }, { id: 3, name: 'Product 3', featured: true } ]; expect(store.featuredProducts).toHaveLength(2); expect(store.featuredProducts[0].name).toBe('Product 1'); expect(store.featuredProducts[1].name).toBe('Product 3'); }); it('should update when products change', () => { store.products = [{ id: 1, name: 'Product 1', featured: true }]; expect(store.featuredProducts).toHaveLength(1); store.products.push({ id: 2, name: 'Product 2', featured: true }); expect(store.featuredProducts).toHaveLength(2); });});测试异步 actiondescribe('AsyncStore', () => { let store; let mockApi; beforeEach(() => { mockApi = { fetchData: jest.fn().mockResolvedValue({ data: 'test' }) }; store = new AsyncStore(mockApi); }); it('should fetch data successfully', async () => { await store.fetchData(); expect(store.data).toEqual({ data: 'test' }); expect(store.loading).toBe(false); expect(mockApi.fetchData).toHaveBeenCalled(); }); it('should handle errors', async () => { mockApi.fetchData.mockRejectedValue(new Error('Network error')); await expect(store.fetchData()).rejects.toThrow('Network error'); expect(store.error).toBe('Network error'); expect(store.loading).toBe(false); });});2. 测试 React 组件测试 observer 组件import { render, screen, fireEvent } from '@testing-library/react';import { observer } from 'mobx-react-lite';import { UserStore } from './UserStore';const TestComponent = observer(({ store }) => ( <div> {store.isAuthenticated ? ( <div>Welcome, {store.user?.name}</div> ) : ( <div>Please login</div> )} <button onClick={store.login}>Login</button> </div>));describe('TestComponent', () => { it('should show login message when not authenticated', () => { const store = new UserStore(); render(<TestComponent store={store} />); expect(screen.getByText('Please login')).toBeInTheDocument(); }); it('should show welcome message when authenticated', () => { const store = new UserStore(); store.user = { id: 1, name: 'Test' }; store.isAuthenticated = true; render(<TestComponent store={store} />); expect(screen.getByText('Welcome, Test')).toBeInTheDocument(); }); it('should update when state changes', () => { const store = new UserStore(); render(<TestComponent store={store} />); expect(screen.getByText('Please login')).toBeInTheDocument(); store.user = { id: 1, name: 'Test' }; store.isAuthenticated = true; expect(screen.getByText('Welcome, Test')).toBeInTheDocument(); });});测试表单组件describe('FormComponent', () => { it('should update form data', () => { const store = new FormStore(); render(<FormComponent store={store} />); const input = screen.getByLabelText('Name'); fireEvent.change(input, { target: { value: 'John' } }); expect(store.formData.name).toBe('John'); }); it('should submit form', async () => { const store = new FormStore(); store.submit = jest.fn(); render(<FormComponent store={store} />); const button = screen.getByText('Submit'); fireEvent.click(button); expect(store.submit).toHaveBeenCalled(); });});3. 使用 MobX 测试工具使用 spyimport { spy } from 'mobx';describe('Spy Usage', () => { it('should spy on observable changes', () => { const store = observable({ count: 0 }); const countSpy = jest.fn(); spy(store, 'count', (change) => { countSpy(change); }); store.count = 1; store.count = 2; expect(countSpy).toHaveBeenCalledTimes(2); });});使用 traceimport { trace } from 'mobx';describe('Trace Usage', () => { it('should trace computed dependencies', () => { const store = observable({ firstName: 'John', lastName: 'Doe' }); const fullName = computed(() => `${store.firstName} ${store.lastName}`); // 追踪依赖 trace(fullName); expect(fullName.get()).toBe('John Doe'); });});使用 isObservableimport { isObservable } from 'mobx';describe('IsObservable Usage', () => { it('should check if object is observable', () => { const observableObj = observable({ count: 0 }); const plainObj = { count: 0 }; expect(isObservable(observableObj)).toBe(true); expect(isObservable(plainObj)).toBe(false); });});4. Mock API 调用使用 Jest mockimport { UserStore } from './UserStore';describe('UserStore with API', () => { let store; let mockApi; beforeEach(() => { mockApi = { login: jest.fn(), logout: jest.fn(), fetchUser: jest.fn() }; store = new UserStore(mockApi); }); it('should call API on login', async () => { mockApi.login.mockResolvedValue({ id: 1, name: 'Test' }); await store.login({ username: 'test', password: 'test' }); expect(mockApi.login).toHaveBeenCalledWith({ username: 'test', password: 'test' }); }); it('should handle API errors', async () => { mockApi.login.mockRejectedValue(new Error('Invalid credentials')); await expect(store.login({ username: 'test', password: 'test' })) .rejects.toThrow('Invalid credentials'); expect(store.error).toBe('Invalid credentials'); });});使用 MSW (Mock Service Worker)import { setupServer, rest } from 'msw';import { UserStore } from './UserStore';const server = setupServer( rest.post('/api/login', (req, res, ctx) => { return res( ctx.status(200), ctx.json({ id: 1, name: 'Test' }) ); }));describe('UserStore with MSW', () => { let store; beforeAll(() => server.listen()); afterEach(() => server.resetHandlers()); afterAll(() => server.close()); beforeEach(() => { store = new UserStore(); }); it('should login successfully', async () => { await store.login({ username: 'test', password: 'test' }); expect(store.user).toEqual({ id: 1, name: 'Test' }); expect(store.isAuthenticated).toBe(true); });});5. 测试 reactiondescribe('Reaction Testing', () => { it('should trigger reaction when observable changes', () => { const store = observable({ count: 0 }); const reactionSpy = jest.fn(); reaction( () => store.count, (count) => { reactionSpy(count); } ); store.count = 1; expect(reactionSpy).toHaveBeenCalledWith(1); store.count = 2; expect(reactionSpy).toHaveBeenCalledWith(2); }); it('should not trigger when value is same', () => { const store = observable({ count: 0 }); const reactionSpy = jest.fn(); reaction( () => store.count, (count) => { reactionSpy(count); } ); store.count = 0; expect(reactionSpy).not.toHaveBeenCalled(); });});6. 测试中间件describe('Middleware Testing', () => { it('should call middleware before action', () => { const middlewareSpy = jest.fn(); const actionSpy = jest.fn(); const store = { @observable count: 0, @action increment() { this.count++; } }; const originalIncrement = store.increment; store.increment = function(...args) { middlewareSpy(...args); return originalIncrement.apply(this, args); }; store.increment(); expect(middlewareSpy).toHaveBeenCalled(); expect(actionSpy).toHaveBeenCalled(); });});7. 集成测试describe('Integration Tests', () => { it('should handle complete user flow', async () => { const store = new RootStore(); // 登录 await store.userStore.login({ username: 'test', password: 'test' }); expect(store.userStore.isAuthenticated).toBe(true); // 加载数据 await store.dataStore.loadData(); expect(store.dataStore.data).not.toBeNull(); // 添加到购物车 store.cartStore.addItem(store.dataStore.data[0]); expect(store.cartStore.items).toHaveLength(1); // 结账 await store.cartStore.checkout(); expect(store.cartStore.items).toHaveLength(0); });});8. 测试最佳实践1. 隔离测试// 每个测试都应该独立beforeEach(() => { store = new Store();});// 清理副作用afterEach(() => { if (store.dispose) { store.dispose(); }});2. 使用快照测试it('should match snapshot', () => { const store = new Store(); store.data = { id: 1, name: 'Test' }; expect(toJS(store.data)).toMatchSnapshot();});3. 测试边界情况it('should handle empty array', () => { const store = new Store(); store.items = []; expect(store.itemCount).toBe(0);});it('should handle null values', () => { const store = new Store(); store.user = null; expect(store.userName).toBe('Guest');});4. 测试错误处理it('should handle network errors gracefully', async () => { const store = new Store(); mockApi.fetchData.mockRejectedValue(new Error('Network error')); await expect(store.fetchData()).rejects.toThrow('Network error'); expect(store.error).toBe('Network error'); expect(store.loading).toBe(false);});总结MobX 测试的关键点:测试 Store:验证 observable、computed 和 action 的行为测试组件:验证 observer 组件的响应性使用测试工具:spy、trace、isObservableMock API:使用 Jest mock 或 MSW测试 reaction:验证副作用是否正确触发测试中间件:验证中间件是否正确执行集成测试:验证多个 store 之间的交互最佳实践:隔离测试、快照测试、边界情况、错误处理遵循这些测试策略,可以构建可靠、可维护的 MobX 应用。
阅读 0·2月21日 15:45

MQTT 的遗嘱消息(Last Will)是什么?如何使用?

MQTT 的遗嘱消息(Last Will and Testament,LWT)是一种重要的机制,用于在客户端异常断开连接时通知其他客户端。遗嘱消息的概念定义遗嘱消息是客户端在连接时预先设置的一条消息,当客户端异常断开连接时,Broker 会自动将这条消息发布到指定的主题。作用异常检测:通知其他客户端某个设备已离线状态通知:发布设备离线状态故障告警:触发告警机制资源清理:通知系统清理相关资源遗嘱消息的工作原理设置遗嘱消息客户端在发送 CONNECT 报文时设置遗嘱消息参数:CONNECT 报文参数:- Will Flag: true(启用遗嘱消息)- Will Topic: 遗嘱消息的主题- Will Message: 遗嘱消息的内容- Will QoS: 遗嘱消息的 QoS 级别- Will Retain: 是否保留遗嘱消息触发条件遗嘱消息在以下情况下会被触发:客户端异常断开网络故障设备断电程序崩溃连接超时Broker 检测到连接断开Keep Alive 超时TCP 连接断开心跳检测失败不触发的情况以下情况不会触发遗嘱消息:正常断开连接客户端发送 DISCONNECT 报文正常关闭连接连接未建立CONNECT 报文发送失败连接被拒绝遗嘱消息的参数Will Flag(遗嘱标志)作用:标识是否启用遗嘱消息值:true/false必需:启用遗嘱消息时必须为 trueWill Topic(遗嘱主题)作用:指定遗嘱消息发布的主题格式:标准的 MQTT 主题字符串示例:device/123/status要求:必须设置Will Message(遗嘱消息内容)作用:遗嘱消息的实际内容格式:二进制数据示例:offline 或 {"status":"offline","timestamp":1234567890}要求:必须设置Will QoS(遗嘱 QoS)作用:指定遗嘱消息的 QoS 级别值:0/1/2默认值:0选择建议:QoS 0:一般状态通知QoS 1:重要状态通知QoS 2:关键状态通知Will Retain(遗嘱保留)作用:指定是否保留遗嘱消息值:true/false默认值:false影响:true:新订阅者会收到遗嘱消息false:只有在线订阅者收到遗嘱消息使用场景1. 设备在线状态监控设备上线:- 发布 "online" 到 device/123/status设备离线(正常):- 发布 "offline" 到 device/123/status设备离线(异常):- 遗嘱消息 "offline" 发布到 device/123/status2. 故障告警遗嘱主题:alert/device/123遗嘱消息:{"type":"offline","device":"123","timestamp":1234567890}监控系统订阅 alert/device/123收到遗嘱消息后触发告警3. 资源清理遗嘱主题:cleanup/device/123遗嘱消息:{"device":"123","action":"cleanup"}清理服务订阅 cleanup/device/123收到遗嘱消息后清理相关资源4. 负载均衡遗嘱主题:worker/offline遗嘱消息:{"worker":"worker1"}负载均衡器订阅 worker/offline收到遗嘱消息后重新分配任务代码示例Python (paho-mqtt)import paho.mqtt.client as mqttimport jsonimport timedef on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") client.subscribe("device/+/status")def on_message(client, userdata, msg): print(f"Received: {msg.topic} - {msg.payload.decode()}")client = mqtt.Client()# 设置遗嘱消息will_topic = "device/123/status"will_message = json.dumps({"status": "offline", "timestamp": int(time.time())})client.will_set(will_topic, will_message, qos=1, retain=True)client.on_connect = on_connectclient.on_message = on_messageclient.connect("broker.example.com", 1883, 60)# 发布在线状态client.publish("device/123/status", json.dumps({"status": "online"}))client.loop_forever()JavaScript (MQTT.js)const mqtt = require('mqtt');const client = mqtt.connect('mqtt://broker.example.com', { will: { topic: 'device/123/status', payload: JSON.stringify({ status: 'offline', timestamp: Date.now() }), qos: 1, retain: true }});client.on('connect', () => { console.log('Connected'); // 发布在线状态 client.publish('device/123/status', JSON.stringify({ status: 'online' })); // 订阅状态主题 client.subscribe('device/+/status');});client.on('message', (topic, message) => { console.log(`Received: ${topic} - ${message.toString()}`);});最佳实践1. 遗嘱消息设计简洁明了:消息内容简洁,易于解析包含时间戳:便于追踪离线时间设备标识:明确标识是哪个设备状态信息:包含详细的离线原因2. 主题命名规范推荐格式:- device/{device_id}/status- alert/{device_id}/offline- cleanup/{device_id}避免使用:- 通配符作为遗嘱主题- 过于复杂的主题结构3. QoS 选择一般设备:QoS 0重要设备:QoS 1关键设备:QoS 24. Retain 设置状态监控:建议设置为 true告警通知:建议设置为 false资源清理:根据需求设置5. 遗嘱消息处理及时处理:收到遗嘱消息后及时处理避免重复:防止重复处理同一设备的离线事件记录日志:记录离线事件,便于问题排查注意事项正常断开:正常断开连接时,应该先发送 DISCONNECT 报文,避免触发遗嘱消息遗嘱消息更新:重新连接时可以更新遗嘱消息内容Broker 限制:某些 Broker 可能对遗嘱消息有大小限制网络延迟:网络延迟可能导致遗嘱消息延迟发送多设备场景:在多设备场景中,需要明确区分不同设备的遗嘱消息遗嘱消息的局限性无法区分离线原因:遗嘱消息不包含具体的离线原因可能误报:网络抖动可能导致误报处理延迟:从离线到发送遗嘱消息可能有延迟依赖 Broker:完全依赖 Broker 的可靠性MQTT 遗嘱消息是物联网应用中非常重要的机制,合理使用可以有效监控设备状态,提高系统的可靠性和可维护性。
阅读 0·2月21日 15:45

MQTT 的发布/订阅模式是如何工作的?

MQTT 的发布/订阅模式是一种消息传递架构,它解耦了消息的生产者和消费者,实现了灵活的一对多通信。核心概念1. 主题(Topic)定义:主题是消息的路由地址,采用层级结构格式:使用斜杠(/)分隔的字符串,如 home/livingroom/temperature特点:层级清晰,便于组织和管理支持通配符订阅大小写敏感长度限制:最多 65535 字节2. 发布者(Publisher)角色:消息的生产者功能:向特定主题发送消息特点:不需要知道订阅者的存在可以同时向多个主题发布消息发布后立即返回,不等待订阅者响应3. 订阅者(Subscriber)角色:消息的消费者功能:订阅感兴趣的主题,接收相关消息特点:可以订阅多个主题可以使用通配符订阅一类主题只接收订阅后发布的消息4. Broker(代理服务器)角色:消息的中转站和路由器功能:接收发布者发送的消息根据订阅关系将消息分发给订阅者管理客户端连接和会话处理消息的 QoS 保证工作流程连接建立:客户端(发布者/订阅者)连接到 Broker订阅主题:订阅者向 Broker 发送订阅请求发布消息:发布者向特定主题发送消息消息路由:Broker 接收消息,查找订阅该主题的客户端消息分发:Broker 将消息转发给所有订阅者消息接收:订阅者接收并处理消息通配符订阅单级通配符(+)匹配单个层级示例:home/+/temperature 匹配 home/livingroom/temperature,但不匹配 home/livingroom/kitchen/temperature多级通配符(#)匹配多个层级,必须放在主题末尾示例:home/# 匹配 home/ 下的所有主题优势解耦性:发布者和订阅者完全解耦,互不依赖灵活性:支持一对多、多对一、多对多的通信模式可扩展性:易于添加新的发布者和订阅者异步性:发布者不需要等待订阅者响应高效性:Broker 负责消息路由,减少网络开销与点对点模式的对比| 特性 | 发布/订阅模式 | 点对点模式 ||-----|-------------|-----------|| 耦合度 | 低 | 高 || 消息接收者 | 多个 | 一个 || 消息持久化 | 可选 | 通常需要 || 复杂度 | 中等 | 简单 || 适用场景 | 广播、通知 | 直接通信 |MQTT 的发布/订阅模式使其成为物联网、实时通信和消息推送等场景的理想选择。
阅读 0·2月21日 15:45

MQTT 的保留消息(Retained Messages)是什么?如何使用?

MQTT 的保留消息(Retained Messages)是一种特殊的消息机制,允许 Broker 保存最新消息,供新订阅者接收。保留消息的概念定义保留消息是 Broker 持久化存储的消息,当有新的客户端订阅该主题时,Broker 会立即将保留消息发送给该客户端。作用状态同步:新订阅者可以立即获取最新状态初始化数据:为新连接的客户端提供初始数据状态恢复:帮助客户端快速恢复到最新状态减少请求:避免客户端主动请求最新状态保留消息的工作原理设置保留消息发布消息时设置 Retain 标志:PUBLISH 报文参数:- Topic: 主题名称- Payload: 消息内容- QoS: QoS 级别- Retain: true(设置为保留消息)保留消息的存储存储位置:Broker 内存或持久化存储存储数量:每个主题只保留一条最新消息存储覆盖:新发布的保留消息会覆盖之前的保留消息保留消息的发送当客户端订阅主题时:客户端发送 SUBSCRIBE 报文Broker 检查该主题是否有保留消息如果有,立即发送保留消息给客户端然后发送后续的新消息保留消息的特性1. 每个主题一条规则:每个主题只保留一条最新的保留消息覆盖机制:新发布的保留消息会替换之前的清除机制:发布空消息(Payload 为空)可以清除保留消息2. QoS 级别继承性:保留消息的 QoS 级别由发布时决定订阅限制:订阅者接收的 QoS 级别受限于订阅时的 QoS 设置QoS 规则:实际 QoS = min(发布 QoS, 订阅 QoS)3. 持久化内存存储:默认存储在内存中持久化存储:可配置持久化到磁盘Broker 重启:持久化的保留消息在 Broker 重启后仍然存在4. 消息顺序发送顺序:保留消息在普通消息之前发送订阅时机:只在订阅时发送一次后续消息:不重复发送保留消息使用场景1. 设备状态同步场景:温度传感器保留主题:sensor/123/temperature保留消息:{"value": 25.5, "unit": "C", "timestamp": 1234567890}新订阅者订阅 sensor/123/temperature立即收到最新温度值2. 配置信息发布场景:设备配置保留主题:config/device/123保留消息:{"mode": "auto", "interval": 60}新设备上线订阅配置主题立即获取最新配置3. 系统状态广播场景:系统状态保留主题:system/status保留消息:{"status": "running", "version": "1.0.0"}新客户端订阅系统状态立即获取当前系统状态4. 开关状态场景:智能开关保留主题:switch/123/state保留消息:{"state": "on"}新订阅者立即获取开关状态代码示例Python (paho-mqtt)import paho.mqtt.client as mqttimport jsonimport timedef on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") # 订阅主题 client.subscribe("sensor/+/temperature")def on_message(client, userdata, msg): print(f"Received: {msg.topic} - {msg.payload.decode()}") print(f"Retained: {msg.retain}")# 发布保留消息client = mqtt.Client()client.connect("broker.example.com", 1883, 60)# 发布保留消息(retain=True)message = {"value": 25.5, "unit": "C", "timestamp": int(time.time())}client.publish("sensor/123/temperature", json.dumps(message), retain=True)# 清除保留消息(发布空消息)# client.publish("sensor/123/temperature", "", retain=True)client.disconnect()# 订阅者subscriber = mqtt.Client()subscriber.on_connect = on_connectsubscriber.on_message = on_messagesubscriber.connect("broker.example.com", 1883, 60)subscriber.loop_forever()JavaScript (MQTT.js)const mqtt = require('mqtt');// 发布保留消息const publisher = mqtt.connect('mqtt://broker.example.com');publisher.on('connect', () => { console.log('Publisher connected'); // 发布保留消息(retain: true) const message = JSON.stringify({ value: 25.5, unit: 'C', timestamp: Date.now() }); publisher.publish('sensor/123/temperature', message, { retain: true }); // 清除保留消息(发布空消息) // publisher.publish('sensor/123/temperature', '', { retain: true }); publisher.end();});// 订阅者const subscriber = mqtt.connect('mqtt://broker.example.com');subscriber.on('connect', () => { console.log('Subscriber connected'); subscriber.subscribe('sensor/+/temperature');});subscriber.on('message', (topic, message) => { console.log(`Received: ${topic} - ${message.toString()}`); console.log(`Retained: ${message.retain}`);});最佳实践1. 保留消息设计状态信息:保留消息应该表示当前状态简洁明了:消息内容简洁,易于解析包含时间戳:便于判断消息的新旧程度版本控制:可以包含版本信息2. 主题命名推荐格式:- sensor/{device_id}/temperature- config/{device_id}- status/{system_id}避免使用:- 通配符主题(不能发布到通配符主题)- 过于复杂的主题结构3. 消息大小限制大小:保留消息不宜过大建议大小:通常小于 1KBBroker 限制:注意 Broker 对消息大小的限制4. QoS 选择一般状态:QoS 0重要状态:QoS 1关键状态:QoS 25. 清除机制主动清除:发布空消息清除保留消息定期清理:定期检查和清理过期的保留消息生命周期管理:为保留消息设置合理的生命周期注意事项内存占用:保留消息会占用 Broker 内存,大量保留消息可能影响性能持久化配置:如果需要保留消息在 Broker 重启后仍然存在,需要配置持久化消息更新:频繁更新保留消息会增加 Broker 负担订阅时机:保留消息只在订阅时发送,不会重复发送QoS 限制:订阅者接收的 QoS 级别受限于订阅时的 QoS 设置空消息清除:发布空消息(Payload 为空)可以清除保留消息保留消息 vs 遗嘱消息| 特性 | 保留消息 | 遗嘱消息 ||-----|---------|---------|| 触发时机 | 订阅时 | 异常断开时 || 消息来源 | 发布者设置 | 客户端设置 || 存储位置 | Broker | Broker || 发送对象 | 新订阅者 | 订阅该主题的客户端 || 消息数量 | 每主题一条 | 每客户端一条 || 清除方式 | 发布空消息 | 正常断开或重新连接 |保留消息的局限性每主题一条:每个主题只能保留一条消息,无法保存历史消息内存占用:大量保留消息会占用较多内存实时性:保留消息可能不是最新的(取决于发布频率)无历史记录:无法获取历史状态变化依赖 Broker:完全依赖 Broker 的可靠性MQTT 保留消息是物联网应用中非常重要的机制,合理使用可以有效实现状态同步和初始化,提高用户体验和系统可靠性。
阅读 0·2月21日 15:44