乐闻世界logo
搜索文章和话题

面试题手册

MariaDB 如何进行备份和恢复?有哪些备份策略和工具?

MariaDB 的备份与恢复是保障数据安全的重要环节,以下是主要的备份和恢复方法:1. 逻辑备份(mysqldump)全量备份:# 备份所有数据库mysqldump -u root -p --all-databases > all_databases.sql# 备份指定数据库mysqldump -u root -p database_name > database_name.sql# 备份指定表mysqldump -u root -p database_name table_name > table_name.sql# 备份并压缩mysqldump -u root -p database_name | gzip > database_name.sql.gz增量备份:# 启用二进制日志# my.cnf 配置log-bin = mysql-binbinlog-format = ROW# 备份二进制日志mysqlbinlog mysql-bin.000001 > binlog_backup.sql恢复数据:# 恢复完整备份mysql -u root -p < all_databases.sql# 恢复指定数据库mysql -u root -p database_name < database_name.sql# 恢复压缩备份gunzip < database_name.sql.gz | mysql -u root -p database_name# 应用二进制日志mysqlbinlog mysql-bin.000001 | mysql -u root -p2. 物理备份(Mariabackup)全量备份:# 创建备份mariabackup --backup --target-dir=/backup/full \ --user=root --password=password# 准备备份mariabackup --prepare --target-dir=/backup/full# 恢复备份mariabackup --copy-back --target-dir=/backup/full增量备份:# 创建全量备份mariabackup --backup --target-dir=/backup/full \ --user=root --password=password# 创建增量备份mariabackup --backup --target-dir=/backup/inc1 \ --incremental-basedir=/backup/full --user=root --password=password# 准备备份mariabackup --prepare --target-dir=/backup/fullmariabackup --prepare --target-dir=/backup/full \ --incremental-dir=/backup/inc13. 快照备份# 使用 LVM 快照lvcreate -L 10G -s -n mysql_snapshot /dev/vg0/mysqlmount /dev/vg0/mysql_snapshot /mnt/backuprsync -av /mnt/backup/ /backup/mysql/umount /mnt/backuplvremove /dev/vg0/mysql_snapshot4. 自动化备份脚本#!/bin/bash# backup.shDATE=$(date +%Y%m%d_%H%M%S)BACKUP_DIR="/backup/mariadb"MYSQL_USER="root"MYSQL_PASSWORD="password"# 创建备份目录mkdir -p $BACKUP_DIR# 全量备份mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases \ --single-transaction --quick --lock-tables=false \ | gzip > $BACKUP_DIR/all_$DATE.sql.gz# 保留最近7天的备份find $BACKUP_DIR -name "all_*.sql.gz" -mtime +7 -deleteecho "Backup completed: all_$DATE.sql.gz"5. 备份策略建议全量备份:每天凌晨执行增量备份:每小时执行二进制日志:实时保留异地备份:定期同步到远程服务器备份验证:定期测试恢复流程6. 恢复注意事项恢复前先停止 MariaDB 服务确保有足够的磁盘空间恢复后验证数据完整性记录恢复过程和时间点在测试环境先验证恢复流程通过合理的备份策略和恢复流程,可以最大程度保障 MariaDB 数据的安全性和可靠性。
阅读 0·2月21日 15:51

MariaDB 的分区表有哪些类型?如何创建和管理分区表?

MariaDB 的分区表(Partitioning)是将大表分割成更小、更易管理的部分的技术,可以显著提升查询性能和管理效率。1. 分区类型RANGE 分区-- 按日期范围分区CREATE TABLE orders ( id INT PRIMARY KEY, order_date DATE, customer_id INT, amount DECIMAL(10,2)) PARTITION BY RANGE (YEAR(order_date)) ( PARTITION p2022 VALUES LESS THAN (2023), PARTITION p2023 VALUES LESS THAN (2024), PARTITION p2024 VALUES LESS THAN (2025), PARTITION pmax VALUES LESS THAN MAXVALUE);-- 按数值范围分区CREATE TABLE users ( id INT PRIMARY KEY, username VARCHAR(50), age INT) PARTITION BY RANGE (age) ( PARTITION p0 VALUES LESS THAN (18), PARTITION p1 VALUES LESS THAN (30), PARTITION p2 VALUES LESS THAN (50), PARTITION p3 VALUES LESS THAN MAXVALUE);LIST 分区-- 按离散值列表分区CREATE TABLE orders ( id INT PRIMARY KEY, order_date DATE, region VARCHAR(20), amount DECIMAL(10,2)) PARTITION BY LIST COLUMNS(region) ( PARTITION p_east VALUES IN ('New York', 'Boston', 'Philadelphia'), PARTITION p_west VALUES IN ('Los Angeles', 'San Francisco', 'Seattle'), PARTITION p_midwest VALUES IN ('Chicago', 'Detroit', 'Cleveland'), PARTITION p_south VALUES IN ('Atlanta', 'Miami', 'Dallas'));HASH 分区-- 哈希分区(均匀分布数据)CREATE TABLE products ( id INT PRIMARY KEY, name VARCHAR(100), category_id INT, price DECIMAL(10,2)) PARTITION BY HASH(id) PARTITIONS 4;-- 线性哈希分区(更快但分布可能不均匀)CREATE TABLE orders ( id INT PRIMARY KEY, customer_id INT, order_date DATE, amount DECIMAL(10,2)) PARTITION BY LINEAR HASH(customer_id) PARTITIONS 8;KEY 分区-- KEY 分区(类似 HASH,使用主键或唯一键)CREATE TABLE users ( id INT PRIMARY KEY, username VARCHAR(50), email VARCHAR(100)) PARTITION BY KEY(id) PARTITIONS 4;2. 分区管理添加分区-- RANGE 分区添加新分区ALTER TABLE orders ADD PARTITION ( PARTITION p2025 VALUES LESS THAN (2026));-- LIST 分区添加新分区ALTER TABLE orders ADD PARTITION ( PARTITION p_other VALUES IN ('Other', 'Unknown'));-- HASH/KEY 分区增加分区数量ALTER TABLE products REORGANIZE PARTITION ( PARTITION p0, PARTITION p1, PARTITION p2, PARTITION p3) INTO ( PARTITION p0, PARTITION p1, PARTITION p2, PARTITION p3, PARTITION p4, PARTITION p5);删除分区-- 删除分区(同时删除数据)ALTER TABLE orders DROP PARTITION p2022;-- 删除所有分区(转换为普通表)ALTER TABLE orders REMOVE PARTITIONING;合并分区-- 合并 RANGE 分区ALTER TABLE orders REORGANIZE PARTITION p2022, p2023 INTO ( PARTITION p2022_2023 VALUES LESS THAN (2024));3. 分区查询-- 查看分区信息SELECT PARTITION_NAME, PARTITION_METHOD, PARTITION_EXPRESSION, PARTITION_DESCRIPTION, TABLE_ROWSFROM information_schema.PARTITIONSWHERE TABLE_NAME = 'orders';-- 查询特定分区SELECT * FROM orders PARTITION (p2023);-- 使用分区裁剪优化查询SELECT * FROM orders WHERE order_date >= '2023-01-01' AND order_date < '2024-01-01';4. 分区维护-- 检查分区ALTER TABLE orders CHECK PARTITION p2023;-- 优化分区ALTER TABLE orders OPTIMIZE PARTITION p2023;-- 分析分区ALTER TABLE orders ANALYZE PARTITION p2023;-- 修复分区ALTER TABLE orders REPAIR PARTITION p2023;5. 分区索引-- 创建本地索引(每个分区独立索引)CREATE INDEX idx_customer_id ON orders(customer_id);-- 创建全局索引(MariaDB 10.3+)CREATE UNIQUE INDEX idx_global ON orders(id);6. 分区使用场景时间序列数据CREATE TABLE logs ( id BIGINT PRIMARY KEY, log_time TIMESTAMP, message TEXT, level VARCHAR(10)) PARTITION BY RANGE (UNIX_TIMESTAMP(log_time)) ( PARTITION p2024_q1 VALUES LESS THAN (UNIX_TIMESTAMP('2024-04-01')), PARTITION p2024_q2 VALUES LESS THAN (UNIX_TIMESTAMP('2024-07-01')), PARTITION p2024_q3 VALUES LESS THAN (UNIX_TIMESTAMP('2024-10-01')), PARTITION p2024_q4 VALUES LESS THAN (UNIX_TIMESTAMP('2025-01-01')));大表归档-- 定期归档旧数据ALTER TABLE orders DROP PARTITION p2022;7. 分区注意事项分区键选择:选择查询中常用的列作为分区键分区数量:不宜过多,建议 10-100 个分区主键约束:主键必须包含分区键唯一索引:唯一索引必须包含分区键数据分布:确保数据在各分区间均匀分布维护成本:分区表需要额外的维护操作分区表是处理大数据量的有效手段,合理使用可以显著提升查询性能和管理效率。
阅读 0·2月21日 15:51

MCP 的数据持久化和缓存策略有哪些?

MCP 的数据持久化和缓存策略对于提高系统性能和可靠性至关重要。以下是详细的实现方法和最佳实践:数据持久化架构MCP 数据持久化应考虑以下方面:存储类型:选择合适的存储类型(关系型、文档型、键值型等)数据模型:设计合理的数据模型持久化策略:实现高效的数据持久化策略缓存策略:实现多层缓存策略数据一致性:确保数据一致性备份恢复:实现数据备份和恢复机制1. 数据模型设计from dataclasses import dataclassfrom typing import Optional, Dict, Any, Listfrom datetime import datetimefrom enum import Enumclass DataType(Enum): """数据类型""" TOOL = "tool" RESOURCE = "resource" PROMPT = "prompt" SESSION = "session" METADATA = "metadata"@dataclassclass DataRecord: """数据记录""" id: str data_type: DataType content: Dict[str, Any] created_at: datetime updated_at: datetime version: int = 1 metadata: Dict[str, Any] = None def __post_init__(self): if self.metadata is None: self.metadata = {}class DataModel: """数据模型""" def __init__(self): self.records: Dict[str, DataRecord] = {} def create_record( self, data_type: DataType, content: Dict[str, Any], metadata: Dict[str, Any] = None ) -> DataRecord: """创建数据记录""" record_id = self._generate_id(data_type) now = datetime.now() record = DataRecord( id=record_id, data_type=data_type, content=content, created_at=now, updated_at=now, metadata=metadata or {} ) self.records[record_id] = record return record def update_record( self, record_id: str, content: Dict[str, Any] = None, metadata: Dict[str, Any] = None ) -> Optional[DataRecord]: """更新数据记录""" if record_id not in self.records: return None record = self.records[record_id] if content: record.content.update(content) if metadata: record.metadata.update(metadata) record.updated_at = datetime.now() record.version += 1 return record def get_record(self, record_id: str) -> Optional[DataRecord]: """获取数据记录""" return self.records.get(record_id) def delete_record(self, record_id: str) -> bool: """删除数据记录""" if record_id in self.records: del self.records[record_id] return True return False def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询数据记录""" records = list(self.records.values()) if data_type: records = [r for r in records if r.data_type == data_type] if filters: for key, value in filters.items(): records = [ r for r in records if self._match_filter(r, key, value) ] return records def _generate_id(self, data_type: DataType) -> str: """生成记录 ID""" import uuid return f"{data_type.value}_{uuid.uuid4().hex}" def _match_filter( self, record: DataRecord, key: str, value: Any ) -> bool: """匹配过滤条件""" # 检查内容 if key in record.content: return record.content[key] == value # 检查元数据 if key in record.metadata: return record.metadata[key] == value return False2. 持久化存储实现from abc import ABC, abstractmethodfrom typing import Dict, List, Optionalimport jsonimport osfrom pathlib import Pathclass PersistenceStorage(ABC): """持久化存储基类""" @abstractmethod async def save_record(self, record: DataRecord) -> bool: """保存记录""" pass @abstractmethod async def load_record(self, record_id: str) -> Optional[DataRecord]: """加载记录""" pass @abstractmethod async def delete_record(self, record_id: str) -> bool: """删除记录""" pass @abstractmethod async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" passclass FileStorage(PersistenceStorage): """文件存储""" def __init__(self, storage_dir: str = "data"): self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(parents=True, exist_ok=True) def _get_file_path(self, record_id: str) -> Path: """获取文件路径""" return self.storage_dir / f"{record_id}.json" async def save_record(self, record: DataRecord) -> bool: """保存记录""" file_path = self._get_file_path(record.id) try: data = { "id": record.id, "data_type": record.data_type.value, "content": record.content, "created_at": record.created_at.isoformat(), "updated_at": record.updated_at.isoformat(), "version": record.version, "metadata": record.metadata } with open(file_path, 'w') as f: json.dump(data, f, indent=2) return True except Exception as e: print(f"保存记录失败: {e}") return False async def load_record(self, record_id: str) -> Optional[DataRecord]: """加载记录""" file_path = self._get_file_path(record_id) if not file_path.exists(): return None try: with open(file_path, 'r') as f: data = json.load(f) record = DataRecord( id=data["id"], data_type=DataType(data["data_type"]), content=data["content"], created_at=datetime.fromisoformat(data["created_at"]), updated_at=datetime.fromisoformat(data["updated_at"]), version=data["version"], metadata=data.get("metadata", {}) ) return record except Exception as e: print(f"加载记录失败: {e}") return None async def delete_record(self, record_id: str) -> bool: """删除记录""" file_path = self._get_file_path(record_id) if file_path.exists(): file_path.unlink() return True return False async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" records = [] for file_path in self.storage_dir.glob("*.json"): try: with open(file_path, 'r') as f: data = json.load(f) record = DataRecord( id=data["id"], data_type=DataType(data["data_type"]), content=data["content"], created_at=datetime.fromisoformat(data["created_at"]), updated_at=datetime.fromisoformat(data["updated_at"]), version=data["version"], metadata=data.get("metadata", {}) ) # 应用过滤条件 if data_type and record.data_type != data_type: continue if filters: match = True for key, value in filters.items(): if key in record.content and record.content[key] != value: match = False break if key in record.metadata and record.metadata[key] != value: match = False break if not match: continue records.append(record) except Exception as e: print(f"加载记录失败 {file_path}: {e}") return recordsclass DatabaseStorage(PersistenceStorage): """数据库存储""" def __init__(self, database_url: str): self.database_url = database_url self._initialize_database() def _initialize_database(self): """初始化数据库""" from sqlalchemy import create_engine, Column, String, Integer, Text, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker Base = declarative_base() class DataRecordTable(Base): __tablename__ = 'data_records' id = Column(String(100), primary_key=True) data_type = Column(String(50), nullable=False, index=True) content = Column(Text, nullable=False) created_at = Column(DateTime, nullable=False) updated_at = Column(DateTime, nullable=False) version = Column(Integer, default=1) metadata = Column(Text) self.engine = create_engine(self.database_url) Base.metadata.create_all(self.engine) self.SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=self.engine ) self.DataRecordTable = DataRecordTable async def save_record(self, record: DataRecord) -> bool: """保存记录""" session = self.SessionLocal() try: db_record = self.DataRecordTable( id=record.id, data_type=record.data_type.value, content=json.dumps(record.content), created_at=record.created_at, updated_at=record.updated_at, version=record.version, metadata=json.dumps(record.metadata) ) session.merge(db_record) session.commit() return True except Exception as e: session.rollback() print(f"保存记录失败: {e}") return False finally: session.close() async def load_record(self, record_id: str) -> Optional[DataRecord]: """加载记录""" session = self.SessionLocal() try: db_record = session.query(self.DataRecordTable).filter( self.DataRecordTable.id == record_id ).first() if not db_record: return None record = DataRecord( id=db_record.id, data_type=DataType(db_record.data_type), content=json.loads(db_record.content), created_at=db_record.created_at, updated_at=db_record.updated_at, version=db_record.version, metadata=json.loads(db_record.metadata) if db_record.metadata else {} ) return record except Exception as e: print(f"加载记录失败: {e}") return None finally: session.close() async def delete_record(self, record_id: str) -> bool: """删除记录""" session = self.SessionLocal() try: session.query(self.DataRecordTable).filter( self.DataRecordTable.id == record_id ).delete() session.commit() return True except Exception as e: session.rollback() print(f"删除记录失败: {e}") return False finally: session.close() async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" session = self.SessionLocal() try: query = session.query(self.DataRecordTable) if data_type: query = query.filter( self.DataRecordTable.data_type == data_type.value ) db_records = query.all() records = [] for db_record in db_records: record = DataRecord( id=db_record.id, data_type=DataType(db_record.data_type), content=json.loads(db_record.content), created_at=db_record.created_at, updated_at=db_record.updated_at, version=db_record.version, metadata=json.loads(db_record.metadata) if db_record.metadata else {} ) # 应用过滤条件 if filters: match = True for key, value in filters.items(): if key in record.content and record.content[key] != value: match = False break if key in record.metadata and record.metadata[key] != value: match = False break if not match: continue records.append(record) return records except Exception as e: print(f"查询记录失败: {e}") return [] finally: session.close()3. 缓存策略实现from typing import Optional, Dict, Any, Listfrom abc import ABC, abstractmethodimport timeclass CacheStrategy(ABC): """缓存策略基类""" @abstractmethod async def get(self, key: str) -> Optional[Any]: """获取缓存值""" pass @abstractmethod async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" pass @abstractmethod async def delete(self, key: str) -> bool: """删除缓存值""" pass @abstractmethod async def clear(self): """清空缓存""" passclass MemoryCache(CacheStrategy): """内存缓存""" def __init__(self, max_size: int = 1000, default_ttl: int = 300): self.max_size = max_size self.default_ttl = default_ttl self.cache: Dict[str, tuple] = {} async def get(self, key: str) -> Optional[Any]: """获取缓存值""" if key not in self.cache: return None value, timestamp, ttl = self.cache[key] # 检查是否过期 if ttl and time.time() - timestamp > ttl: del self.cache[key] return None return value async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" # 检查缓存大小 if len(self.cache) >= self.max_size: self._evict() ttl = ttl or self.default_ttl self.cache[key] = (value, time.time(), ttl) async def delete(self, key: str) -> bool: """删除缓存值""" if key in self.cache: del self.cache[key] return True return False async def clear(self): """清空缓存""" self.cache.clear() def _evict(self): """淘汰缓存项""" # 简单实现:随机淘汰 if self.cache: key = next(iter(self.cache)) del self.cache[key]class RedisCache(CacheStrategy): """Redis 缓存""" def __init__(self, redis_url: str = "redis://localhost:6379/0"): import redis.asyncio as aioredis self.redis = aioredis.from_url(redis_url) async def get(self, key: str) -> Optional[Any]: """获取缓存值""" try: value = await self.redis.get(key) if value: return json.loads(value) return None except Exception as e: print(f"获取缓存失败: {e}") return None async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" try: serialized_value = json.dumps(value) if ttl: await self.redis.setex(key, ttl, serialized_value) else: await self.redis.set(key, serialized_value) except Exception as e: print(f"设置缓存失败: {e}") async def delete(self, key: str) -> bool: """删除缓存值""" try: result = await self.redis.delete(key) return result > 0 except Exception as e: print(f"删除缓存失败: {e}") return False async def clear(self): """清空缓存""" try: await self.redis.flushdb() except Exception as e: print(f"清空缓存失败: {e}")class MultiLevelCache: """多级缓存""" def __init__( self, l1_cache: CacheStrategy, l2_cache: CacheStrategy = None ): self.l1_cache = l1_cache self.l2_cache = l2_cache async def get(self, key: str) -> Optional[Any]: """获取缓存值""" # 先从 L1 缓存获取 value = await self.l1_cache.get(key) if value is not None: return value # 从 L2 缓存获取 if self.l2_cache: value = await self.l2_cache.get(key) if value is not None: # 回填 L1 缓存 await self.l1_cache.set(key, value) return value return None async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" # 同时设置 L1 和 L2 缓存 await self.l1_cache.set(key, value, ttl) if self.l2_cache: await self.l2_cache.set(key, value, ttl) async def delete(self, key: str) -> bool: """删除缓存值""" # 同时删除 L1 和 L2 缓存 l1_deleted = await self.l1_cache.delete(key) l2_deleted = True if self.l2_cache: l2_deleted = await self.l2_cache.delete(key) return l1_deleted or l2_deleted async def clear(self): """清空缓存""" await self.l1_cache.clear() if self.l2_cache: await self.l2_cache.clear()4. 数据持久化管理器from typing import Optional, Dict, Any, Listclass DataPersistenceManager: """数据持久化管理器""" def __init__( self, storage: PersistenceStorage, cache: CacheStrategy = None ): self.storage = storage self.cache = cache self.data_model = DataModel() async def save_record( self, data_type: DataType, content: Dict[str, Any], metadata: Dict[str, Any] = None, use_cache: bool = True ) -> Optional[DataRecord]: """保存记录""" # 创建记录 record = self.data_model.create_record( data_type, content, metadata ) # 持久化存储 success = await self.storage.save_record(record) if not success: return None # 更新缓存 if use_cache and self.cache: await self.cache.set(record.id, record) return record async def load_record( self, record_id: str, use_cache: bool = True ) -> Optional[DataRecord]: """加载记录""" # 先从缓存获取 if use_cache and self.cache: record = await self.cache.get(record_id) if record: return record # 从存储加载 record = await self.storage.load_record(record_id) if record and use_cache and self.cache: # 更新缓存 await self.cache.set(record.id, record) return record async def update_record( self, record_id: str, content: Dict[str, Any] = None, metadata: Dict[str, Any] = None, use_cache: bool = True ) -> Optional[DataRecord]: """更新记录""" # 更新数据模型 record = self.data_model.update_record( record_id, content, metadata ) if not record: return None # 持久化存储 success = await self.storage.save_record(record) if not success: return None # 更新缓存 if use_cache and self.cache: await self.cache.set(record.id, record) return record async def delete_record( self, record_id: str, use_cache: bool = True ) -> bool: """删除记录""" # 从存储删除 success = await self.storage.delete_record(record_id) if not success: return False # 从缓存删除 if use_cache and self.cache: await self.cache.delete(record_id) # 从数据模型删除 self.data_model.delete_record(record_id) return True async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" return await self.storage.query_records(data_type, filters)5. 数据备份和恢复import shutilfrom typing import Optionalfrom datetime import datetimeclass BackupManager: """备份管理器""" def __init__(self, storage_dir: str = "data", backup_dir: str = "backups"): self.storage_dir = Path(storage_dir) self.backup_dir = Path(backup_dir) self.backup_dir.mkdir(parents=True, exist_ok=True) async def create_backup(self, backup_name: str = None) -> Optional[str]: """创建备份""" if not backup_name: backup_name = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = self.backup_dir / backup_name try: # 创建备份目录 backup_path.mkdir(parents=True, exist_ok=True) # 复制数据文件 for file_path in self.storage_dir.glob("*.json"): shutil.copy2(file_path, backup_path / file_path.name) # 创建备份元数据 metadata = { "backup_name": backup_name, "created_at": datetime.now().isoformat(), "file_count": len(list(backup_path.glob("*.json"))) } metadata_path = backup_path / "backup_metadata.json" with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) return backup_name except Exception as e: print(f"创建备份失败: {e}") return None async def restore_backup(self, backup_name: str) -> bool: """恢复备份""" backup_path = self.backup_dir / backup_name if not backup_path.exists(): print(f"备份不存在: {backup_name}") return False try: # 清空当前数据目录 for file_path in self.storage_dir.glob("*.json"): file_path.unlink() # 复制备份文件 for file_path in backup_path.glob("*.json"): if file_path.name != "backup_metadata.json": shutil.copy2(file_path, self.storage_dir / file_path.name) return True except Exception as e: print(f"恢复备份失败: {e}") return False async def list_backups(self) -> List[Dict[str, Any]]: """列出所有备份""" backups = [] for backup_path in self.backup_dir.iterdir(): if not backup_path.is_dir(): continue metadata_path = backup_path / "backup_metadata.json" if metadata_path.exists(): with open(metadata_path, 'r') as f: metadata = json.load(f) backups.append(metadata) return backups async def delete_backup(self, backup_name: str) -> bool: """删除备份""" backup_path = self.backup_dir / backup_name if backup_path.exists(): shutil.rmtree(backup_path) return True return False最佳实践:分层存储:根据数据访问频率选择合适的存储类型缓存策略:实现多级缓存,提高访问速度数据一致性:确保缓存和存储的数据一致性定期备份:定期创建数据备份,防止数据丢失监控告警:监控存储和缓存的健康状态性能优化:优化数据访问和存储性能通过完善的数据持久化和缓存策略,可以确保 MCP 系统的高性能和可靠性。
阅读 0·2月21日 15:51

MCP 的错误处理和重试机制如何实现?

MCP 的错误处理和重试机制对于确保系统稳定性和可靠性至关重要。以下是详细的错误处理策略和重试机制实现:错误处理架构MCP 错误处理应考虑以下方面:错误分类:区分不同类型的错误错误传播:正确传播错误信息错误恢复:实现错误恢复机制重试策略:智能的重试策略熔断机制:防止级联故障降级策略:在故障时提供降级服务1. 错误分类和定义from enum import Enumfrom typing import Optional, Dict, Anyfrom dataclasses import dataclassclass ErrorType(Enum): """错误类型""" VALIDATION_ERROR = "validation_error" AUTHENTICATION_ERROR = "authentication_error" AUTHORIZATION_ERROR = "authorization_error" NOT_FOUND_ERROR = "not_found_error" CONFLICT_ERROR = "conflict_error" RATE_LIMIT_ERROR = "rate_limit_error" INTERNAL_ERROR = "internal_error" EXTERNAL_SERVICE_ERROR = "external_service_error" TIMEOUT_ERROR = "timeout_error" NETWORK_ERROR = "network_error"class ErrorSeverity(Enum): """错误严重程度""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical"@dataclassclass MCPError(Exception): """MCP 错误基类""" error_type: ErrorType message: str code: int details: Dict[str, Any] = None severity: ErrorSeverity = ErrorSeverity.MEDIUM retryable: bool = False def __post_init__(self): if self.details is None: self.details = {} super().__init__(self.message) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "error_type": self.error_type.value, "message": self.message, "code": self.code, "details": self.details, "severity": self.severity.value, "retryable": self.retryable }class ValidationError(MCPError): """验证错误""" def __init__(self, message: str, details: Dict[str, Any] = None): super().__init__( error_type=ErrorType.VALIDATION_ERROR, message=message, code=400, details=details, severity=ErrorSeverity.LOW, retryable=False )class AuthenticationError(MCPError): """认证错误""" def __init__(self, message: str = "Authentication failed"): super().__init__( error_type=ErrorType.AUTHENTICATION_ERROR, message=message, code=401, severity=ErrorSeverity.HIGH, retryable=False )class AuthorizationError(MCPError): """授权错误""" def __init__(self, message: str = "Access denied"): super().__init__( error_type=ErrorType.AUTHORIZATION_ERROR, message=message, code=403, severity=ErrorSeverity.HIGH, retryable=False )class NotFoundError(MCPError): """未找到错误""" def __init__(self, resource: str, identifier: str): super().__init__( error_type=ErrorType.NOT_FOUND_ERROR, message=f"{resource} not found: {identifier}", code=404, details={"resource": resource, "identifier": identifier}, severity=ErrorSeverity.LOW, retryable=False )class RateLimitError(MCPError): """速率限制错误""" def __init__(self, message: str = "Rate limit exceeded", retry_after: int = 60): super().__init__( error_type=ErrorType.RATE_LIMIT_ERROR, message=message, code=429, details={"retry_after": retry_after}, severity=ErrorSeverity.MEDIUM, retryable=True )class InternalError(MCPError): """内部错误""" def __init__(self, message: str = "Internal server error"): super().__init__( error_type=ErrorType.INTERNAL_ERROR, message=message, code=500, severity=ErrorSeverity.CRITICAL, retryable=True )class ExternalServiceError(MCPError): """外部服务错误""" def __init__(self, service: str, message: str): super().__init__( error_type=ErrorType.EXTERNAL_SERVICE_ERROR, message=f"{service} error: {message}", code=502, details={"service": service}, severity=ErrorSeverity.HIGH, retryable=True )class TimeoutError(MCPError): """超时错误""" def __init__(self, operation: str, timeout: float): super().__init__( error_type=ErrorType.TIMEOUT_ERROR, message=f"{operation} timed out after {timeout}s", code=504, details={"operation": operation, "timeout": timeout}, severity=ErrorSeverity.HIGH, retryable=True )2. 错误处理器from typing import Callable, Optional, Dict, Anyimport loggingimport tracebackclass ErrorHandler: """错误处理器""" def __init__(self, logger: logging.Logger = None): self.logger = logger or logging.getLogger(__name__) self.error_handlers: Dict[ErrorType, Callable] = {} self.error_reporters: List[Callable] = [] def register_handler( self, error_type: ErrorType, handler: Callable ): """注册错误处理器""" self.error_handlers[error_type] = handler def register_reporter(self, reporter: Callable): """注册错误报告器""" self.error_reporters.append(reporter) async def handle_error( self, error: Exception, context: Dict[str, Any] = None ) -> Dict[str, Any]: """处理错误""" # 记录错误 await self._log_error(error, context) # 报告错误 await self._report_error(error, context) # 转换为 MCP 错误 mcp_error = self._convert_to_mcp_error(error) # 调用特定错误处理器 if mcp_error.error_type in self.error_handlers: try: result = await self.error_handlers[mcp_error.error_type]( mcp_error, context ) return result except Exception as e: self.logger.error(f"错误处理器失败: {e}") # 返回默认错误响应 return mcp_error.to_dict() async def _log_error( self, error: Exception, context: Dict[str, Any] = None ): """记录错误""" if isinstance(error, MCPError): self.logger.error( f"MCP Error: {error.error_type.value} - {error.message}", extra={ "error_code": error.code, "error_details": error.details, "context": context } ) else: self.logger.error( f"Unexpected error: {str(error)}", exc_info=True, extra={"context": context} ) async def _report_error( self, error: Exception, context: Dict[str, Any] = None ): """报告错误""" for reporter in self.error_reporters: try: await reporter(error, context) except Exception as e: self.logger.error(f"错误报告器失败: {e}") def _convert_to_mcp_error(self, error: Exception) -> MCPError: """转换为 MCP 错误""" if isinstance(error, MCPError): return error # 根据异常类型转换 if isinstance(error, ValueError): return ValidationError(str(error)) elif isinstance(error, PermissionError): return AuthorizationError(str(error)) elif isinstance(error, TimeoutError): return TimeoutError("operation", 0) else: return InternalError(str(error))# 错误报告器示例class ErrorReporter: """错误报告器""" def __init__(self, error_service_url: str): self.error_service_url = error_service_url async def report_error( self, error: Exception, context: Dict[str, Any] = None ): """报告错误到错误服务""" import aiohttp error_data = { "error": str(error), "error_type": type(error).__name__, "context": context or {}, "timestamp": datetime.now().isoformat() } try: async with aiohttp.ClientSession() as session: async with session.post( self.error_service_url, json=error_data ) as response: if response.status != 200: self.logger.error( f"报告错误失败: {response.status}" ) except Exception as e: self.logger.error(f"报告错误失败: {e}")3. 重试机制import asynciofrom typing import Callable, Optional, Typeimport timeclass RetryStrategy: """重试策略基类""" async def should_retry( self, attempt: int, error: Exception ) -> bool: """判断是否应该重试""" raise NotImplementedError async def get_delay(self, attempt: int) -> float: """获取重试延迟""" raise NotImplementedErrorclass FixedDelayRetry(RetryStrategy): """固定延迟重试""" def __init__(self, max_attempts: int = 3, delay: float = 1.0): self.max_attempts = max_attempts self.delay = delay async def should_retry( self, attempt: int, error: Exception ) -> bool: """判断是否应该重试""" if attempt >= self.max_attempts: return False if isinstance(error, MCPError): return error.retryable return True async def get_delay(self, attempt: int) -> float: """获取重试延迟""" return self.delayclass ExponentialBackoffRetry(RetryStrategy): """指数退避重试""" def __init__( self, max_attempts: int = 5, initial_delay: float = 1.0, max_delay: float = 60.0, backoff_factor: float = 2.0 ): self.max_attempts = max_attempts self.initial_delay = initial_delay self.max_delay = max_delay self.backoff_factor = backoff_factor async def should_retry( self, attempt: int, error: Exception ) -> bool: """判断是否应该重试""" if attempt >= self.max_attempts: return False if isinstance(error, MCPError): return error.retryable return True async def get_delay(self, attempt: int) -> float: """获取重试延迟""" delay = self.initial_delay * (self.backoff_factor ** attempt) return min(delay, self.max_delay)class RetryManager: """重试管理器""" def __init__(self, retry_strategy: RetryStrategy): self.retry_strategy = retry_strategy async def execute_with_retry( self, func: Callable, *args, **kwargs ) -> Any: """带重试执行函数""" attempt = 0 last_error = None while True: attempt += 1 try: result = await func(*args, **kwargs) return result except Exception as error: last_error = error # 判断是否应该重试 should_retry = await self.retry_strategy.should_retry( attempt, error ) if not should_retry: raise error # 获取重试延迟 delay = await self.retry_strategy.get_delay(attempt) # 等待后重试 await asyncio.sleep(delay) raise last_error# 重试装饰器def retry( max_attempts: int = 3, delay: float = 1.0, backoff_factor: float = 2.0, max_delay: float = 60.0): """重试装饰器""" def decorator(func: Callable): retry_strategy = ExponentialBackoffRetry( max_attempts=max_attempts, initial_delay=delay, max_delay=max_delay, backoff_factor=backoff_factor ) retry_manager = RetryManager(retry_strategy) @wraps(func) async def wrapper(*args, **kwargs): return await retry_manager.execute_with_retry( func, *args, **kwargs ) return wrapper return decorator4. 熔断机制from enum import Enumfrom typing import Callable, Optionalimport asyncioclass CircuitState(Enum): """熔断器状态""" CLOSED = "closed" # 正常状态 OPEN = "open" # 熔断状态 HALF_OPEN = "half_open" # 半开状态class CircuitBreaker: """熔断器""" def __init__( self, failure_threshold: int = 5, success_threshold: int = 2, timeout: float = 60.0 ): self.failure_threshold = failure_threshold self.success_threshold = success_threshold self.timeout = timeout self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time: Optional[float] = None self.lock = asyncio.Lock() async def execute( self, func: Callable, *args, **kwargs ) -> Any: """执行函数""" async with self.lock: # 检查熔断器状态 if self.state == CircuitState.OPEN: # 检查是否应该尝试恢复 if time.time() - self.last_failure_time > self.timeout: self.state = CircuitState.HALF_OPEN self.success_count = 0 else: raise MCPError( error_type=ErrorType.INTERNAL_ERROR, message="Circuit breaker is OPEN", code=503, retryable=True ) try: result = await func(*args, **kwargs) # 成功执行 async with self.lock: if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.success_threshold: self.state = CircuitState.CLOSED self.failure_count = 0 elif self.state == CircuitState.CLOSED: self.failure_count = 0 return result except Exception as error: # 执行失败 async with self.lock: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN raise error def get_state(self) -> CircuitState: """获取熔断器状态""" return self.state def reset(self): """重置熔断器""" async with self.lock: self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time = None# 熔断器装饰器def circuit_breaker( failure_threshold: int = 5, success_threshold: int = 2, timeout: float = 60.0): """熔断器装饰器""" def decorator(func: Callable): breaker = CircuitBreaker( failure_threshold=failure_threshold, success_threshold=success_threshold, timeout=timeout ) @wraps(func) async def wrapper(*args, **kwargs): return await breaker.execute(func, *args, **kwargs) return wrapper return decorator5. 降级策略from typing import Callable, Optional, Dict, Anyimport asyncioclass FallbackStrategy: """降级策略基类""" async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """执行降级逻辑""" raise NotImplementedErrorclass CacheFallback(FallbackStrategy): """缓存降级""" def __init__(self, cache: Dict[str, Any]): self.cache = cache async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """从缓存获取数据""" cache_key = context.get("cache_key") if context else None if cache_key and cache_key in self.cache: return self.cache[cache_key] raise errorclass DefaultFallback(FallbackStrategy): """默认值降级""" def __init__(self, default_value: Any): self.default_value = default_value async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """返回默认值""" return self.default_valueclass FallbackManager: """降级管理器""" def __init__(self): self.fallback_strategies: Dict[ErrorType, FallbackStrategy] = {} self.default_fallback: Optional[FallbackStrategy] = None def register_fallback( self, error_type: ErrorType, fallback: FallbackStrategy ): """注册降级策略""" self.fallback_strategies[error_type] = fallback def set_default_fallback(self, fallback: FallbackStrategy): """设置默认降级策略""" self.default_fallback = fallback async def execute_with_fallback( self, func: Callable, context: Dict[str, Any] = None, *args, **kwargs ) -> Any: """带降级执行函数""" try: return await func(*args, **kwargs) except Exception as error: # 转换为 MCP 错误 if not isinstance(error, MCPError): error = InternalError(str(error)) # 查找对应的降级策略 fallback = self.fallback_strategies.get( error.error_type, self.default_fallback ) if fallback: try: return await fallback.execute_fallback(error, context) except Exception as fallback_error: raise fallback_error raise error# 降级装饰器def fallback( error_type: ErrorType = None, default_value: Any = None): """降级装饰器""" def decorator(func: Callable): fallback_manager = FallbackManager() if error_type and default_value is not None: fallback_manager.register_fallback( error_type, DefaultFallback(default_value) ) @wraps(func) async def wrapper(*args, **kwargs): return await fallback_manager.execute_with_fallback( func, None, *args, **kwargs ) return wrapper return decorator6. 综合错误处理示例from mcp.server import Serverclass RobustMCPServer(Server): """健壮的 MCP 服务器""" def __init__(self, name: str): super().__init__(name) # 初始化错误处理组件 self.error_handler = ErrorHandler() self.retry_manager = RetryManager(ExponentialBackoffRetry()) self.circuit_breaker = CircuitBreaker() self.fallback_manager = FallbackManager() # 配置错误处理 self._setup_error_handling() def _setup_error_handling(self): """设置错误处理""" # 注册错误处理器 self.error_handler.register_handler( ErrorType.VALIDATION_ERROR, self._handle_validation_error ) self.error_handler.register_handler( ErrorType.RATE_LIMIT_ERROR, self._handle_rate_limit_error ) # 注册降级策略 self.fallback_manager.register_fallback( ErrorType.EXTERNAL_SERVICE_ERROR, CacheFallback({}) ) async def _handle_validation_error( self, error: ValidationError, context: Dict[str, Any] ) -> Dict[str, Any]: """处理验证错误""" return { "error": error.to_dict(), "suggestions": self._get_validation_suggestions(error.details) } async def _handle_rate_limit_error( self, error: RateLimitError, context: Dict[str, Any] ) -> Dict[str, Any]: """处理速率限制错误""" retry_after = error.details.get("retry_after", 60) return { "error": error.to_dict(), "retry_after": retry_after, "message": f"请等待 {retry_after} 秒后重试" } def _get_validation_suggestions( self, details: Dict[str, Any] ) -> List[str]: """获取验证建议""" suggestions = [] # 根据错误详情提供建议 # ... return suggestions @retry(max_attempts=3, delay=1.0) @circuit_breaker(failure_threshold=5, timeout=60.0) @fallback(error_type=ErrorType.EXTERNAL_SERVICE_ERROR, default_value={}) async def call_external_service( self, service_url: str, params: Dict[str, Any] ) -> Dict[str, Any]: """调用外部服务""" try: # 调用外部服务 # ... pass except Exception as error: # 转换为 MCP 错误 raise ExternalServiceError("external", str(error))最佳实践:错误分类:正确分类错误类型,便于针对性处理重试策略:根据错误类型选择合适的重试策略熔断机制:防止级联故障,保护系统稳定性降级策略:在故障时提供降级服务,保证基本功能错误日志:详细记录错误信息,便于问题排查监控告警:监控错误率,及时发现问题通过完善的错误处理和重试机制,可以确保 MCP 系统的稳定性和可靠性。
阅读 0·2月21日 15:51

MobX 6 有哪些主要变化和新特性?

MobX 6 是 MobX 的一个重大版本更新,引入了许多重要的变化和新特性。以下是 MobX 6 的主要变化:1. 强制使用 ActionMobX 5 及之前class Store { @observable count = 0; increment() { this.count++; // 可以直接修改 }}MobX 6class Store { @observable count = 0; @action // 必须使用 action increment() { this.count++; }}在 MobX 6 中,所有状态修改都必须在 action 中进行。这是为了提供更好的可预测性和调试体验。2. 装饰器 API 的变化MobX 5import { observable, computed, action } from 'mobx';class Store { @observable count = 0; @computed get doubled() { return this.count * 2; } @action increment() { this.count++; }}MobX 6import { makeObservable, observable, computed, action } from 'mobx';class Store { count = 0; constructor() { makeObservable(this); } get doubled() { return this.count * 2; } increment() { this.count++; }}MobX 6 推荐使用 makeObservable 而不是装饰器,但装饰器仍然支持。3. makeAutoObservable 的引入MobX 6 引入了 makeAutoObservable,它可以自动推断属性的类型:import { makeAutoObservable } from 'mobx';class Store { count = 0; firstName = 'John'; lastName = 'Doe'; constructor() { makeAutoObservable(this); } get fullName() { return `${this.firstName} ${this.lastName}`; } increment() { this.count++; }}makeAutoObservable 会自动:将 getter 标记为 computed将方法标记为 action将字段标记为 observable4. 配置 API 的简化MobX 5import { configure } from 'mobx';configure({ enforceActions: 'always', useProxies: 'always', computedRequiresReaction: true});MobX 6import { configure } from 'mobx';configure({ enforceActions: 'always', // 默认值 useProxies: 'ifavailable', // 默认值 computedRequiresReaction: false // 默认值});MobX 6 的默认配置更加严格和合理。5. Proxy 的强制使用MobX 6 强制使用 Proxy(在支持的浏览器中),这提供了更好的性能和更简单的 API。Proxy 的优势更好的性能更简单的 API更好的 TypeScript 支持更少的限制不支持 Proxy 的环境在旧浏览器或 Node.js 环境中,MobX 6 会自动降级到兼容模式。6. 移除的 API以下 API 在 MobX 6 中被移除:isObservableObject → 使用 isObservableintercept → 使用 observe 的拦截功能extras API → 大部分功能已集成到主 APItoJS → 使用 toJSON 或手动转换7. TypeScript 支持的改进MobX 6 对 TypeScript 的支持更加完善:import { makeAutoObservable } from 'mobx';class Store { count: number = 0; firstName: string = 'John'; lastName: string = 'Doe'; constructor() { makeAutoObservable(this); } get fullName(): string { return `${this.firstName} ${this.lastName}`; } increment(): void { this.count++; }}类型推断更加准确,类型定义更加简洁。8. 性能优化MobX 6 引入了多项性能优化:更快的依赖追踪更高效的 computed 缓存更好的批量更新机制减少的内存占用9. 调试体验的改进MobX 6 提供了更好的调试工具:更清晰的错误消息更好的堆栈跟踪改进的 DevTools 支持10. 迁移指南从 MobX 5 迁移到 MobX 6更新依赖npm install mobx@6 mobx-react@6添加 action 装饰器// 之前increment() { this.count++;}// 之后@actionincrement() { this.count++;}使用 makeObservable 或 makeAutoObservableconstructor() { makeAutoObservable(this);}更新配置configure({ enforceActions: 'always'});移除已废弃的 API替换 toJS 为 toJSON更新 isObservableObject 为 isObservable总结MobX 6 是一个重要的版本更新,主要改进包括:强制使用 action 提高可预测性简化的 API 和更好的 TypeScript 支持强制使用 Proxy 提高性能更好的调试体验移除已废弃的 API迁移到 MobX 6 需要一些工作,但带来的改进是值得的。建议新项目直接使用 MobX 6,现有项目逐步迁移。
阅读 0·2月21日 15:51

MobX 常见问题和解决方案有哪些?

MobX 是一个功能强大的状态管理库,但在使用过程中可能会遇到一些常见问题。了解这些问题及其解决方案可以帮助开发者更好地使用 MobX。1. 组件不更新问题描述组件使用 observer 包装,但状态变化时组件不更新。常见原因和解决方案原因 1:访问的是普通对象而不是 observable// 错误const store = { count: 0};@observerclass Counter extends React.Component { render() { return <div>{store.count}</div>; // 不会更新 }}// 正确import { observable } from 'mobx';const store = observable({ count: 0});@observerclass Counter extends React.Component { render() { return <div>{store.count}</div>; // 会更新 }}原因 2:在 action 外修改状态(MobX 6)// 错误(MobX 6)class Store { @observable count = 0; increment() { this.count++; // 不在 action 中,会报错 }}// 正确class Store { @observable count = 0; @action increment() { this.count++; // 在 action 中 }}原因 3:在 render 中创建新对象// 错误@observerclass Component extends React.Component { render() { const style = { color: 'red' }; // 每次渲染都创建新对象 return <div style={style}>{store.count}</div>; }}// 正确const style = { color: 'red' }; // 在组件外部定义@observerclass Component extends React.Component { render() { return <div style={style}>{store.count}</div>; }}2. 性能问题问题描述应用性能下降,组件频繁重新渲染。常见原因和解决方案原因 1:过度追踪// 错误:在循环中访问 observable@observerclass List extends React.Component { render() { return ( <div> {store.items.map(item => ( <div key={item.id}> {item.name} - {item.value} - {item.description} </div> ))} </div> ); }}// 正确:使用 computed 预处理数据class Store { @observable items = []; @computed get itemDisplayData() { return this.items.map(item => ({ id: item.id, display: `${item.name} - ${item.value} - ${item.description}` })); }}@observerclass List extends React.Component { render() { return ( <div> {store.itemDisplayData.map(item => ( <div key={item.id}>{item.display}</div> ))} </div> ); }}原因 2:组件依赖太多状态// 错误:组件依赖太多状态@observerclass Dashboard extends React.Component { render() { return ( <div> <UserInfo /> <Settings /> <DataCount /> </div> ); }}// 正确:拆分为多个组件@observerclass UserInfo extends React.Component { render() { return ( <div> <div>{store.user.name}</div> <div>{store.user.email}</div> </div> ); }}3. 内存泄漏问题描述组件卸载后,reaction 仍然在运行,导致内存泄漏。解决方案// 错误:忘记清理 reactionuseEffect(() => { autorun(() => { console.log(store.count); });}, []);// 正确:清理 reactionuseEffect(() => { const dispose = autorun(() => { console.log(store.count); }); return () => { dispose(); // 清理 reaction };}, []);4. 异步操作问题问题描述异步操作中的状态修改不生效。解决方案// 错误:异步操作中的状态修改@actionasync fetchData() { this.loading = true; const data = await fetch('/api/data').then(r => r.json()); this.data = data; // 不在 action 中 this.loading = false; // 不在 action 中}// 正确:使用 runInAction@actionasync fetchData() { this.loading = true; try { const data = await fetch('/api/data').then(r => r.json()); runInAction(() => { this.data = data; }); } finally { runInAction(() => { this.loading = false; }); }}// 或者使用 flowfetchData = flow(function* () { this.loading = true; try { const response = yield fetch('/api/data'); const data = yield response.json(); this.data = data; } finally { this.loading = false; }});5. computed 不更新问题描述computed 属性没有按预期更新。常见原因和解决方案原因 1:在 computed 中产生副作用// 错误:在 computed 中产生副作用@computed get badComputed() { console.log('Side effect!'); // 不应该在 computed 中 fetch('/api/data'); // 不应该在 computed 中 return this.data;}// 正确:computed 应该是纯函数@computed get goodComputed() { return this.data.filter(item => item.active);}原因 2:依赖项没有正确追踪// 错误:依赖项没有正确追踪@computed get badComputed() { const data = this.data; // 没有在返回值中使用 return this.items.length;}// 正确:依赖项正确追踪@computed get goodComputed() { return this.data.length + this.items.length;}6. 循环依赖问题描述多个 store 之间存在循环依赖,导致无限循环或性能问题。解决方案// 错误:循环依赖class StoreA { @observable value = 0; @computed get doubled() { return storeB.value * 2; }}class StoreB { @observable value = 0; @computed get doubled() { return storeA.value * 2; }}// 正确:避免循环依赖class Store { @observable valueA = 0; @observable valueB = 0; @computed get doubledA() { return this.valueA * 2; } @computed get doubledB() { return this.valueB * 2; }}7. 装饰器不工作问题描述使用装饰器时出现错误或不生效。解决方案确保配置正确// package.json{ "babel": { "plugins": [ ["@babel/plugin-proposal-decorators", { "legacy": true }], ["@babel/plugin-proposal-class-properties", { "loose": true }] ] }}或者使用 makeObservable// 不使用装饰器class Store { count = 0; constructor() { makeObservable(this, { count: observable }); }}8. TypeScript 类型错误问题描述使用 TypeScript 时出现类型错误。解决方案// 错误:没有类型参数class Store { count = 0; constructor() { makeObservable(this, { count: observable }); }}// 正确:使用类型参数class Store { count: number = 0; constructor() { makeObservable<Store>(this, { count: observable }); }}9. 数组操作问题问题描述数组操作不触发更新。解决方案// 错误:重新赋值整个数组@actionbadAddItem(item) { this.items = [...this.items, item];}// 正确:使用数组方法@actiongoodAddItem(item) { this.items.push(item);}// 或者使用 replace@actionreplaceItems(newItems) { this.items.replace(newItems);}10. 调试困难问题描述难以追踪状态变化和依赖关系。解决方案使用 traceimport { trace } from 'mobx';// 追踪 computedtrace(store.fullName);// 追踪组件渲染@observerclass MyComponent extends React.Component { render() { trace(true); // 追踪组件渲染 return <div>{store.count}</div>; }}使用 MobX DevToolsimport { configure } from 'mobx';configure({ // 启用调试模式 useProxies: 'ifavailable', isolateGlobalState: true});最佳实践总结始终在 action 中修改状态(MobX 6)使用 observer 包装需要响应状态变化的组件避免在 render 中创建新对象使用 computed 优化计算逻辑及时清理 reaction 和副作用正确处理异步操作避免循环依赖使用 trace 和 DevTools 调试合理拆分组件以减少依赖遵循 MobX 的最佳实践遵循这些最佳实践,可以避免大多数常见的 MobX 问题,构建稳定、高效的应用。
阅读 0·2月21日 15:50

MobX 中的 toJS、toJSON 和 observable.shallow 有什么区别?

MobX 提供了多种工具来处理状态,包括 toJS、toJSON 和 observable.shallow。理解它们的区别和使用场景对于正确使用 MobX 至关重要。1. toJS基本用法toJS 将 observable 对象深度转换为普通 JavaScript 对象。import { observable, toJS } from 'mobx';const store = observable({ user: { name: 'John', age: 30, address: { city: 'New York', country: 'USA' } }, items: [1, 2, 3]});// 转换为普通对象const plainObject = toJS(store);console.log(plainObject);// {// user: {// name: 'John',// age: 30,// address: { city: 'New York', country: 'USA' }// },// items: [1, 2, 3]// }// plainObject 不再是 observableconsole.log(isObservable(plainObject)); // falseconsole.log(isObservable(plainObject.user)); // falseconsole.log(isObservable(plainObject.items)); // false使用场景将 observable 对象发送到 API将 observable 对象存储到 localStorage将 observable 对象传递给不兼容 observable 的库调试时查看状态示例:发送到 API@actionasync saveData() { const plainData = toJS(this.data); await api.saveData(plainData);}示例:存储到 localStorage@actionsaveToLocalStorage() { const plainState = toJS(this.state); localStorage.setItem('appState', JSON.stringify(plainState));}2. toJSON基本用法toJSON 将 observable 对象转换为 JSON 可序列化的对象。import { observable, toJSON } from 'mobx';const store = observable({ user: { name: 'John', age: 30, address: { city: 'New York', country: 'USA' } }, items: [1, 2, 3]});// 转换为 JSON 对象const jsonObject = toJSON(store);console.log(jsonObject);// {// user: {// name: 'John',// age: 30,// address: { city: 'New York', country: 'USA' }// },// items: [1, 2, 3]// }// 可以直接序列化为 JSONconst jsonString = JSON.stringify(store);console.log(jsonString);// {"user":{"name":"John","age":30,"address":{"city":"New York","country":"USA"}},"items":[1,2,3]}自定义 toJSONclass User { @observable name = 'John'; @observable password = 'secret'; @observable email = 'john@example.com'; toJSON() { return { name: this.name, email: this.email // 不包含 password }; }}const user = new User();const json = JSON.stringify(user);console.log(json);// {"name":"John","email":"john@example.com"}使用场景序列化 observable 对象为 JSON发送数据到服务器存储数据到数据库创建 API 响应3. observable.shallow基本用法observable.shallow 创建浅层可观察对象,只有顶层属性是可观察的。import { observable } from 'mobx';// 深度可观察(默认)const deepStore = observable({ user: { name: 'John', age: 30 }, items: [1, 2, 3]});// 浅层可观察const shallowStore = observable.shallow({ user: { name: 'John', age: 30 }, items: [1, 2, 3]});// deepStore 的嵌套对象也是可观察的deepStore.user.name = 'Jane'; // 会触发更新deepStore.items.push(4); // 会触发更新// shallowStore 的嵌套对象不是可观察的shallowStore.user.name = 'Jane'; // 不会触发更新shallowStore.items.push(4); // 不会触发更新// 但顶层属性的变化会触发更新shallowStore.user = { name: 'Jane', age: 30 }; // 会触发更新shallowStore.items = [1, 2, 3, 4]; // 会触发更新使用场景性能优化:减少需要追踪的依赖避免深度追踪带来的性能问题只需要追踪顶层变化处理大型数据结构示例:大型数组class Store { @observable.shallow items = []; constructor() { makeAutoObservable(this); } @action loadItems = async () => { const data = await fetch('/api/items').then(r => r.json()); this.items = data; // 只追踪整个数组的替换 };}4. observable.deep基本用法observable.deep 创建深度可观察对象,所有嵌套的属性都是可观察的(这是默认行为)。import { observable } from 'mobx';const deepStore = observable.deep({ user: { name: 'John', age: 30, address: { city: 'New York', country: 'USA' } }, items: [1, 2, 3]});// 所有嵌套属性都是可观察的deepStore.user.name = 'Jane'; // 会触发更新deepStore.user.address.city = 'Boston'; // 会触发更新deepStore.items.push(4); // 会触发更新5. 对比总结| 特性 | toJS | toJSON | observable.shallow ||------|------|--------|-------------------|| 用途 | 转换为普通 JS 对象 | 转换为 JSON 对象 | 创建浅层可观察对象 || 深度 | 深度转换 | 深度转换 | 仅顶层可观察 || 返回值 | 普通 JS 对象 | JSON 可序列化对象 | 可观察对象 || 可观察性 | 不可观察 | 不可观察 | 可观察 || 使用场景 | API 调用、存储 | 序列化、API 响应 | 性能优化 |6. 性能考虑使用 shallow 优化性能// 不好的做法:深度可观察大型数组class BadStore { @observable items = []; // 可能有数千个元素}// 好的做法:浅层可观察class GoodStore { @observable.shallow items = []; @action loadItems = async () => { const data = await fetch('/api/items').then(r => r.json()); this.items = data; // 只追踪数组替换 };}避免频繁调用 toJS// 不好的做法:频繁调用 toJS@observerclass BadComponent extends React.Component { render() { const plainData = toJS(store.data); // 每次渲染都调用 return <div>{plainData.length}</div>; }}// 好的做法:缓存结果或直接使用 observable@observerclass GoodComponent extends React.Component { render() { return <div>{store.data.length}</div>; // 直接使用 observable }}7. 常见陷阱陷阱 1:在 computed 中调用 toJS// 不好的做法@computed get badComputed() { const plainData = toJS(this.data); return plainData.filter(item => item.active);}// 好的做法@computed get goodComputed() { return this.data.filter(item => item.active);}陷阱 2:忘记 shallow 的限制const shallowStore = observable.shallow({ items: []});// 不会触发更新shallowStore.items.push(1);// 会触发更新shallowStore.items = [1];陷阱 3:混淆 toJS 和 toJSONconst store = observable({ user: { name: 'John' }});// toJS 返回普通对象const plain = toJS(store);console.log(plain instanceof Object); // true// toJSON 返回 JSON 可序列化对象const json = toJSON(store);console.log(JSON.stringify(json)); // {"user":{"name":"John"}}8. 最佳实践1. 根据需求选择可观察深度// 小型数据结构:使用深度可观察const smallStore = observable({ config: { theme: 'dark', language: 'en' }});// 大型数据结构:使用浅层可观察const largeStore = observable.shallow({ items: [] // 可能有数千个元素});2. 在需要时才使用 toJS// 只在发送到 API 时使用@actionasync sendData() { const plainData = toJS(this.data); await api.sendData(plainData);}// 在组件中直接使用 observable@observerconst Component = () => { return <div>{store.data.length}</div>;};3. 自定义 toJSON 控制序列化class User { @observable id = 1; @observable name = 'John'; @observable password = 'secret'; toJSON() { return { id: this.id, name: this.name // 不包含敏感信息 }; }}总结理解 toJS、toJSON 和 observable.shallow 的区别和使用场景:toJS:将 observable 转换为普通 JS 对象,用于 API 调用和存储toJSON:将 observable 转换为 JSON 对象,用于序列化observable.shallow:创建浅层可观察对象,用于性能优化正确使用这些工具,可以构建更高效、更可维护的 MobX 应用。
阅读 0·2月21日 15:50