Data persistence and caching strategies for MCP are crucial for improving system performance and reliability. Here are detailed implementation methods and best practices:
Data Persistence Architecture
MCP data persistence should consider following aspects:
- Storage Type: Choose appropriate storage type (relational, document, key-value, etc.)
- Data Model: Design reasonable data model
- Persistence Strategy: Implement efficient data persistence strategy
- Caching Strategy: Implement multi-level caching strategy
- Data Consistency: Ensure data consistency
- Backup and Recovery: Implement data backup and recovery mechanisms
1. Data Model Design
pythonfrom dataclasses import dataclass from typing import Optional, Dict, Any, List from datetime import datetime from enum import Enum class DataType(Enum): """Data type""" TOOL = "tool" RESOURCE = "resource" PROMPT = "prompt" SESSION = "session" METADATA = "metadata" @dataclass class DataRecord: """Data record""" id: str data_type: DataType content: Dict[str, Any] created_at: datetime updated_at: datetime version: int = 1 metadata: Dict[str, Any] = None def __post_init__(self): if self.metadata is None: self.metadata = {} class DataModel: """Data model""" def __init__(self): self.records: Dict[str, DataRecord] = {} def create_record( self, data_type: DataType, content: Dict[str, Any], metadata: Dict[str, Any] = None ) -> DataRecord: """Create data record""" record_id = self._generate_id(data_type) now = datetime.now() record = DataRecord( id=record_id, data_type=data_type, content=content, created_at=now, updated_at=now, metadata=metadata or {} ) self.records[record_id] = record return record def update_record( self, record_id: str, content: Dict[str, Any] = None, metadata: Dict[str, Any] = None ) -> Optional[DataRecord]: """Update data record""" if record_id not in self.records: return None record = self.records[record_id] if content: record.content.update(content) if metadata: record.metadata.update(metadata) record.updated_at = datetime.now() record.version += 1 return record def get_record(self, record_id: str) -> Optional[DataRecord]: """Get data record""" return self.records.get(record_id) def delete_record(self, record_id: str) -> bool: """Delete data record""" if record_id in self.records: del self.records[record_id] return True return False def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """Query data records""" records = list(self.records.values()) if data_type: records = [r for r in records if r.data_type == data_type] if filters: for key, value in filters.items(): records = [ r for r in records if self._match_filter(r, key, value) ] return records def _generate_id(self, data_type: DataType) -> str: """Generate record ID""" import uuid return f"{data_type.value}_{uuid.uuid4().hex}" def _match_filter( self, record: DataRecord, key: str, value: Any ) -> bool: """Match filter condition""" # Check content if key in record.content: return record.content[key] == value # Check metadata if key in record.metadata: return record.metadata[key] == value return False
2. Persistence Storage Implementation
pythonfrom abc import ABC, abstractmethod from typing import Dict, List, Optional import json import os from pathlib import Path class PersistenceStorage(ABC): """Persistence storage base class""" @abstractmethod async def save_record(self, record: DataRecord) -> bool: """Save record""" pass @abstractmethod async def load_record(self, record_id: str) -> Optional[DataRecord]: """Load record""" pass @abstractmethod async def delete_record(self, record_id: str) -> bool: """Delete record""" pass @abstractmethod async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """Query records""" pass class FileStorage(PersistenceStorage): """File storage""" def __init__(self, storage_dir: str = "data"): self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(parents=True, exist_ok=True) def _get_file_path(self, record_id: str) -> Path: """Get file path""" return self.storage_dir / f"{record_id}.json" async def save_record(self, record: DataRecord) -> bool: """Save record""" file_path = self._get_file_path(record.id) try: data = { "id": record.id, "data_type": record.data_type.value, "content": record.content, "created_at": record.created_at.isoformat(), "updated_at": record.updated_at.isoformat(), "version": record.version, "metadata": record.metadata } with open(file_path, 'w') as f: json.dump(data, f, indent=2) return True except Exception as e: print(f"Failed to save record: {e}") return False async def load_record(self, record_id: str) -> Optional[DataRecord]: """Load record""" file_path = self._get_file_path(record_id) if not file_path.exists(): return None try: with open(file_path, 'r') as f: data = json.load(f) record = DataRecord( id=data["id"], data_type=DataType(data["data_type"]), content=data["content"], created_at=datetime.fromisoformat(data["created_at"]), updated_at=datetime.fromisoformat(data["updated_at"]), version=data["version"], metadata=data.get("metadata", {}) ) return record except Exception as e: print(f"Failed to load record: {e}") return None async def delete_record(self, record_id: str) -> bool: """Delete record""" file_path = self._get_file_path(record_id) if file_path.exists(): file_path.unlink() return True return False async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """Query records""" records = [] for file_path in self.storage_dir.glob("*.json"): try: with open(file_path, 'r') as f: data = json.load(f) record = DataRecord( id=data["id"], data_type=DataType(data["data_type"]), content=data["content"], created_at=datetime.fromisoformat(data["created_at"]), updated_at=datetime.fromisoformat(data["updated_at"]), version=data["version"], metadata=data.get("metadata", {}) ) # Apply filter conditions if data_type and record.data_type != data_type: continue if filters: match = True for key, value in filters.items(): if key in record.content and record.content[key] != value: match = False break if key in record.metadata and record.metadata[key] != value: match = False break if not match: continue records.append(record) except Exception as e: print(f"Failed to load record {file_path}: {e}") return records class DatabaseStorage(PersistenceStorage): """Database storage""" def __init__(self, database_url: str): self.database_url = database_url self._initialize_database() def _initialize_database(self): """Initialize database""" from sqlalchemy import create_engine, Column, String, Integer, Text, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker Base = declarative_base() class DataRecordTable(Base): __tablename__ = 'data_records' id = Column(String(100), primary_key=True) data_type = Column(String(50), nullable=False, index=True) content = Column(Text, nullable=False) created_at = Column(DateTime, nullable=False) updated_at = Column(DateTime, nullable=False) version = Column(Integer, default=1) metadata = Column(Text) self.engine = create_engine(self.database_url) Base.metadata.create_all(self.engine) self.SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=self.engine ) self.DataRecordTable = DataRecordTable async def save_record(self, record: DataRecord) -> bool: """Save record""" session = self.SessionLocal() try: db_record = self.DataRecordTable( id=record.id, data_type=record.data_type.value, content=json.dumps(record.content), created_at=record.created_at, updated_at=record.updated_at, version=record.version, metadata=json.dumps(record.metadata) ) session.merge(db_record) session.commit() return True except Exception as e: session.rollback() print(f"Failed to save record: {e}") return False finally: session.close() async def load_record(self, record_id: str) -> Optional[DataRecord]: """Load record""" session = self.SessionLocal() try: db_record = session.query(self.DataRecordTable).filter( self.DataRecordTable.id == record_id ).first() if not db_record: return None record = DataRecord( id=db_record.id, data_type=DataType(db_record.data_type), content=json.loads(db_record.content), created_at=db_record.created_at, updated_at=db_record.updated_at, version=db_record.version, metadata=json.loads(db_record.metadata) if db_record.metadata else {} ) return record except Exception as e: print(f"Failed to load record: {e}") return None finally: session.close() async def delete_record(self, record_id: str) -> bool: """Delete record""" session = self.SessionLocal() try: session.query(self.DataRecordTable).filter( self.DataRecordTable.id == record_id ).delete() session.commit() return True except Exception as e: session.rollback() print(f"Failed to delete record: {e}") return False finally: session.close() async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """Query records""" session = self.SessionLocal() try: query = session.query(self.DataRecordTable) if data_type: query = query.filter( self.DataRecordTable.data_type == data_type.value ) db_records = query.all() records = [] for db_record in db_records: record = DataRecord( id=db_record.id, data_type=DataType(db_record.data_type), content=json.loads(db_record.content), created_at=db_record.created_at, updated_at=db_record.updated_at, version=db_record.version, metadata=json.loads(db_record.metadata) if db_record.metadata else {} ) # Apply filter conditions if filters: match = True for key, value in filters.items(): if key in record.content and record.content[key] != value: match = False break if key in record.metadata and record.metadata[key] != value: match = False break if not match: continue records.append(record) return records except Exception as e: print(f"Failed to query records: {e}") return [] finally: session.close()
3. Caching Strategy Implementation
pythonfrom typing import Optional, Dict, Any, List from abc import ABC, abstractmethod import time class CacheStrategy(ABC): """Cache strategy base class""" @abstractmethod async def get(self, key: str) -> Optional[Any]: """Get cached value""" pass @abstractmethod async def set(self, key: str, value: Any, ttl: int = None): """Set cached value""" pass @abstractmethod async def delete(self, key: str) -> bool: """Delete cached value""" pass @abstractmethod async def clear(self): """Clear cache""" pass class MemoryCache(CacheStrategy): """Memory cache""" def __init__(self, max_size: int = 1000, default_ttl: int = 300): self.max_size = max_size self.default_ttl = default_ttl self.cache: Dict[str, tuple] = {} async def get(self, key: str) -> Optional[Any]: """Get cached value""" if key not in self.cache: return None value, timestamp, ttl = self.cache[key] # Check if expired if ttl and time.time() - timestamp > ttl: del self.cache[key] return None return value async def set(self, key: str, value: Any, ttl: int = None): """Set cached value""" # Check cache size if len(self.cache) >= self.max_size: self._evict() ttl = ttl or self.default_ttl self.cache[key] = (value, time.time(), ttl) async def delete(self, key: str) -> bool: """Delete cached value""" if key in self.cache: del self.cache[key] return True return False async def clear(self): """Clear cache""" self.cache.clear() def _evict(self): """Evict cache entry""" # Simple implementation: random eviction if self.cache: key = next(iter(self.cache)) del self.cache[key] class RedisCache(CacheStrategy): """Redis cache""" def __init__(self, redis_url: str = "redis://localhost:6379/0"): import redis.asyncio as aioredis self.redis = aioredis.from_url(redis_url) async def get(self, key: str) -> Optional[Any]: """Get cached value""" try: value = await self.redis.get(key) if value: return json.loads(value) return None except Exception as e: print(f"Failed to get cache: {e}") return None async def set(self, key: str, value: Any, ttl: int = None): """Set cached value""" try: serialized_value = json.dumps(value) if ttl: await self.redis.setex(key, ttl, serialized_value) else: await self.redis.set(key, serialized_value) except Exception as e: print(f"Failed to set cache: {e}") async def delete(self, key: str) -> bool: """Delete cached value""" try: result = await self.redis.delete(key) return result > 0 except Exception as e: print(f"Failed to delete cache: {e}") return False async def clear(self): """Clear cache""" try: await self.redis.flushdb() except Exception as e: print(f"Failed to clear cache: {e}") class MultiLevelCache: """Multi-level cache""" def __init__( self, l1_cache: CacheStrategy, l2_cache: CacheStrategy = None ): self.l1_cache = l1_cache self.l2_cache = l2_cache async def get(self, key: str) -> Optional[Any]: """Get cached value""" # Get from L1 cache first value = await self.l1_cache.get(key) if value is not None: return value # Get from L2 cache if self.l2_cache: value = await self.l2_cache.get(key) if value is not None: # Backfill L1 cache await self.l1_cache.set(key, value) return value return None async def set(self, key: str, value: Any, ttl: int = None): """Set cached value""" # Set both L1 and L2 cache await self.l1_cache.set(key, value, ttl) if self.l2_cache: await self.l2_cache.set(key, value, ttl) async def delete(self, key: str) -> bool: """Delete cached value""" # Delete both L1 and L2 cache l1_deleted = await self.l1_cache.delete(key) l2_deleted = True if self.l2_cache: l2_deleted = await self.l2_cache.delete(key) return l1_deleted or l2_deleted async def clear(self): """Clear cache""" await self.l1_cache.clear() if self.l2_cache: await self.l2_cache.clear()
4. Data Persistence Manager
pythonfrom typing import Optional, Dict, Any, List class DataPersistenceManager: """Data persistence manager""" def __init__( self, storage: PersistenceStorage, cache: CacheStrategy = None ): self.storage = storage self.cache = cache self.data_model = DataModel() async def save_record( self, data_type: DataType, content: Dict[str, Any], metadata: Dict[str, Any] = None, use_cache: bool = True ) -> Optional[DataRecord]: """Save record""" # Create record record = self.data_model.create_record( data_type, content, metadata ) # Persist to storage success = await self.storage.save_record(record) if not success: return None # Update cache if use_cache and self.cache: await self.cache.set(record.id, record) return record async def load_record( self, record_id: str, use_cache: bool = True ) -> Optional[DataRecord]: """Load record""" # Get from cache first if use_cache and self.cache: record = await self.cache.get(record_id) if record: return record # Load from storage record = await self.storage.load_record(record_id) if record and use_cache and self.cache: # Update cache await self.cache.set(record.id, record) return record async def update_record( self, record_id: str, content: Dict[str, Any] = None, metadata: Dict[str, Any] = None, use_cache: bool = True ) -> Optional[DataRecord]: """Update record""" # Update data model record = self.data_model.update_record( record_id, content, metadata ) if not record: return None # Persist to storage success = await self.storage.save_record(record) if not success: return None # Update cache if use_cache and self.cache: await self.cache.set(record.id, record) return record async def delete_record( self, record_id: str, use_cache: bool = True ) -> bool: """Delete record""" # Delete from storage success = await self.storage.delete_record(record_id) if not success: return False # Delete from cache if use_cache and self.cache: await self.cache.delete(record_id) # Delete from data model self.data_model.delete_record(record_id) return True async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """Query records""" return await self.storage.query_records(data_type, filters)
5. Data Backup and Recovery
pythonimport shutil from typing import Optional from datetime import datetime class BackupManager: """Backup manager""" def __init__(self, storage_dir: str = "data", backup_dir: str = "backups"): self.storage_dir = Path(storage_dir) self.backup_dir = Path(backup_dir) self.backup_dir.mkdir(parents=True, exist_ok=True) async def create_backup(self, backup_name: str = None) -> Optional[str]: """Create backup""" if not backup_name: backup_name = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = self.backup_dir / backup_name try: # Create backup directory backup_path.mkdir(parents=True, exist_ok=True) # Copy data files for file_path in self.storage_dir.glob("*.json"): shutil.copy2(file_path, backup_path / file_path.name) # Create backup metadata metadata = { "backup_name": backup_name, "created_at": datetime.now().isoformat(), "file_count": len(list(backup_path.glob("*.json"))) } metadata_path = backup_path / "backup_metadata.json" with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) return backup_name except Exception as e: print(f"Failed to create backup: {e}") return None async def restore_backup(self, backup_name: str) -> bool: """Restore backup""" backup_path = self.backup_dir / backup_name if not backup_path.exists(): print(f"Backup does not exist: {backup_name}") return False try: # Clear current data directory for file_path in self.storage_dir.glob("*.json"): file_path.unlink() # Copy backup files for file_path in backup_path.glob("*.json"): if file_path.name != "backup_metadata.json": shutil.copy2(file_path, self.storage_dir / file_path.name) return True except Exception as e: print(f"Failed to restore backup: {e}") return False async def list_backups(self) -> List[Dict[str, Any]]: """List all backups""" backups = [] for backup_path in self.backup_dir.iterdir(): if not backup_path.is_dir(): continue metadata_path = backup_path / "backup_metadata.json" if metadata_path.exists(): with open(metadata_path, 'r') as f: metadata = json.load(f) backups.append(metadata) return backups async def delete_backup(self, backup_name: str) -> bool: """Delete backup""" backup_path = self.backup_dir / backup_name if backup_path.exists(): shutil.rmtree(backup_path) return True return False
Best Practices:
- Layered Storage: Choose appropriate storage type based on data access frequency
- Caching Strategy: Implement multi-level caching to improve access speed
- Data Consistency: Ensure consistency between cache and storage data
- Regular Backup: Create data backups regularly to prevent data loss
- Monitoring and Alerting: Monitor health status of storage and cache
- Performance Optimization: Optimize data access and storage performance
Through comprehensive data persistence and caching strategies, you can ensure high performance and reliability of MCP system.