乐闻世界logo
搜索文章和话题

面试题手册

MCP 的性能监控和优化有哪些策略?

MCP 的性能监控和优化对于确保系统高效运行至关重要。以下是详细的监控策略和优化方法:性能监控架构MCP 性能监控应考虑以下方面:指标收集:收集系统运行的关键指标性能分析:分析性能瓶颈和优化机会实时监控:实时监控系统状态和性能告警机制:及时发现和通知性能问题优化策略:基于监控数据进行性能优化1. 指标收集系统from dataclasses import dataclassfrom typing import Dict, List, Optionalfrom datetime import datetimeimport time@dataclassclass Metric: """性能指标""" name: str value: float timestamp: datetime tags: Dict[str, str] = None def __post_init__(self): if self.tags is None: self.tags = {}class MetricsCollector: """指标收集器""" def __init__(self): self.metrics: List[Metric] = [] self.counters: Dict[str, int] = {} self.gauges: Dict[str, float] = {} self.histograms: Dict[str, List[float]] = {} def increment_counter(self, name: str, value: int = 1, tags: Dict[str, str] = None): """增加计数器""" key = self._make_key(name, tags) self.counters[key] = self.counters.get(key, 0) + value # 记录指标 self._record_metric(name, self.counters[key], tags) def set_gauge(self, name: str, value: float, tags: Dict[str, str] = None): """设置仪表值""" key = self._make_key(name, tags) self.gauges[key] = value # 记录指标 self._record_metric(name, value, tags) def record_histogram(self, name: str, value: float, tags: Dict[str, str] = None): """记录直方图值""" key = self._make_key(name, tags) if key not in self.histograms: self.histograms[key] = [] self.histograms[key].append(value) # 限制历史记录大小 if len(self.histograms[key]) > 1000: self.histograms[key] = self.histograms[key][-1000:] # 记录指标 self._record_metric(name, value, tags) def _record_metric(self, name: str, value: float, tags: Dict[str, str] = None): """记录指标""" metric = Metric( name=name, value=value, timestamp=datetime.now(), tags=tags or {} ) self.metrics.append(metric) # 限制指标历史大小 if len(self.metrics) > 10000: self.metrics = self.metrics[-10000:] def _make_key(self, name: str, tags: Dict[str, str] = None) -> str: """生成指标键""" if not tags: return name tag_str = ",".join([f"{k}={v}" for k, v in sorted(tags.items())]) return f"{name}{{{tag_str}}}" def get_metrics( self, name: str = None, since: datetime = None ) -> List[Metric]: """获取指标""" metrics = self.metrics if name: metrics = [m for m in metrics if m.name == name] if since: metrics = [m for m in metrics if m.timestamp >= since] return metrics def get_histogram_stats( self, name: str, tags: Dict[str, str] = None ) -> Optional[Dict]: """获取直方图统计""" key = self._make_key(name, tags) if key not in self.histograms: return None values = self.histograms[key] if not values: return None sorted_values = sorted(values) return { "count": len(values), "sum": sum(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values), "p50": self._percentile(sorted_values, 50), "p90": self._percentile(sorted_values, 90), "p95": self._percentile(sorted_values, 95), "p99": self._percentile(sorted_values, 99), } def _percentile(self, sorted_values: List[float], percentile: int) -> float: """计算百分位数""" if not sorted_values: return 0.0 index = int(len(sorted_values) * percentile / 100) return sorted_values[min(index, len(sorted_values) - 1)]2. 性能分析器from functools import wrapsfrom typing import Callable, Optionalimport timeclass PerformanceProfiler: """性能分析器""" def __init__(self, metrics_collector: MetricsCollector): self.metrics_collector = metrics_collector def profile_function( self, metric_name: str, tags: Dict[str, str] = None ): """函数性能分析装饰器""" def decorator(func: Callable): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) # 记录执行时间 execution_time = time.time() - start_time self.metrics_collector.record_histogram( f"{metric_name}_duration", execution_time, tags ) # 记录成功计数 self.metrics_collector.increment_counter( f"{metric_name}_success", tags=tags ) return result except Exception as e: # 记录失败计数 self.metrics_collector.increment_counter( f"{metric_name}_error", tags=tags ) raise e @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) # 记录执行时间 execution_time = time.time() - start_time self.metrics_collector.record_histogram( f"{metric_name}_duration", execution_time, tags ) # 记录成功计数 self.metrics_collector.increment_counter( f"{metric_name}_success", tags=tags ) return result except Exception as e: # 记录失败计数 self.metrics_collector.increment_counter( f"{metric_name}_error", tags=tags ) raise e # 根据函数类型返回对应的包装器 if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator def profile_context( self, metric_name: str, tags: Dict[str, str] = None ): """上下文管理器性能分析""" class ProfileContext: def __init__(self, profiler, name, tags): self.profiler = profiler self.name = name self.tags = tags self.start_time = None def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_val, exc_tb): execution_time = time.time() - self.start_time self.profiler.metrics_collector.record_histogram( f"{self.name}_duration", execution_time, self.tags ) if exc_type is None: self.profiler.metrics_collector.increment_counter( f"{self.name}_success", tags=self.tags ) else: self.profiler.metrics_collector.increment_counter( f"{self.name}_error", tags=self.tags ) return False return ProfileContext(self, metric_name, tags)3. 实时监控系统import asynciofrom typing import Dict, List, Callable, Optionalclass RealTimeMonitor: """实时监控器""" def __init__( self, metrics_collector: MetricsCollector, check_interval: int = 5 ): self.metrics_collector = metrics_collector self.check_interval = check_interval self.alerts: List[Dict] = [] self.alert_rules: List[Dict] = [] self.subscribers: List[Callable] = [] self.running = False def add_alert_rule( self, name: str, metric_name: str, condition: str, threshold: float, severity: str = "warning" ): """添加告警规则""" self.alert_rules.append({ "name": name, "metric_name": metric_name, "condition": condition, "threshold": threshold, "severity": severity }) def subscribe(self, callback: Callable): """订阅告警""" self.subscribers.append(callback) def unsubscribe(self, callback: Callable): """取消订阅""" if callback in self.subscribers: self.subscribers.remove(callback) async def start(self): """启动监控""" self.running = True while self.running: await self.check_alerts() await asyncio.sleep(self.check_interval) async def stop(self): """停止监控""" self.running = False async def check_alerts(self): """检查告警""" for rule in self.alert_rules: try: alert = await self._evaluate_rule(rule) if alert: self.alerts.append(alert) await self._notify_subscribers(alert) except Exception as e: print(f"检查告警规则失败: {rule['name']}, 错误: {e}") async def _evaluate_rule(self, rule: Dict) -> Optional[Dict]: """评估告警规则""" metric_name = rule["metric_name"] condition = rule["condition"] threshold = rule["threshold"] # 获取最近的指标 recent_metrics = self.metrics_collector.get_metrics( metric_name, since=datetime.now() - timedelta(minutes=1) ) if not recent_metrics: return None # 计算聚合值 values = [m.value for m in recent_metrics] avg_value = sum(values) / len(values) # 检查条件 triggered = False if condition == "greater_than": triggered = avg_value > threshold elif condition == "less_than": triggered = avg_value < threshold elif condition == "equals": triggered = avg_value == threshold elif condition == "not_equals": triggered = avg_value != threshold if triggered: return { "rule_name": rule["name"], "metric_name": metric_name, "current_value": avg_value, "threshold": threshold, "condition": condition, "severity": rule["severity"], "timestamp": datetime.now() } return None async def _notify_subscribers(self, alert: Dict): """通知订阅者""" for callback in self.subscribers: try: await callback(alert) except Exception as e: print(f"通知订阅者失败: {e}") def get_recent_alerts( self, since: datetime = None, severity: str = None ) -> List[Dict]: """获取最近的告警""" alerts = self.alerts if since: alerts = [a for a in alerts if a["timestamp"] >= since] if severity: alerts = [a for a in alerts if a["severity"] == severity] return alerts4. 性能优化策略from typing import Dict, List, Optionalimport asynciofrom functools import lru_cacheclass PerformanceOptimizer: """性能优化器""" def __init__(self, metrics_collector: MetricsCollector): self.metrics_collector = metrics_collector self.optimization_strategies = {} def register_strategy( self, name: str, strategy: Callable, priority: int = 0 ): """注册优化策略""" self.optimization_strategies[name] = { "strategy": strategy, "priority": priority } async def optimize(self, context: Dict) -> Dict: """执行优化""" results = {} # 按优先级排序策略 sorted_strategies = sorted( self.optimization_strategies.items(), key=lambda x: x[1]["priority"], reverse=True ) for name, strategy_info in sorted_strategies: try: result = await strategy_info["strategy"](context) results[name] = result except Exception as e: results[name] = {"error": str(e)} return results# 缓存优化策略class CacheOptimizer: """缓存优化器""" def __init__(self, max_size: int = 1000, ttl: int = 300): self.max_size = max_size self.ttl = ttl self.cache: Dict[str, tuple] = {} def get(self, key: str) -> Optional[any]: """获取缓存值""" if key not in self.cache: return None value, timestamp = self.cache[key] # 检查是否过期 if time.time() - timestamp > self.ttl: del self.cache[key] return None return value def set(self, key: str, value: any): """设置缓存值""" # 检查缓存大小 if len(self.cache) >= self.max_size: self._evict() self.cache[key] = (value, time.time()) def _evict(self): """淘汰缓存项""" # 简单实现:随机淘汰 if self.cache: key = next(iter(self.cache)) del self.cache[key]# 连接池优化class ConnectionPoolOptimizer: """连接池优化器""" def __init__(self, min_size: int = 5, max_size: int = 20): self.min_size = min_size self.max_size = max_size self.pool = asyncio.Queue() self.created = 0 self.lock = asyncio.Lock() async def acquire(self) -> any: """获取连接""" try: # 尝试从池中获取连接 connection = await asyncio.wait_for( self.pool.get(), timeout=1.0 ) return connection except asyncio.TimeoutError: # 池中没有可用连接,创建新连接 async with self.lock: if self.created < self.max_size: connection = await self._create_connection() self.created += 1 return connection # 等待其他连接释放 return await self.pool.get() async def release(self, connection: any): """释放连接""" await self.pool.put(connection) async def _create_connection(self) -> any: """创建新连接""" # 实现具体的连接创建逻辑 pass# 批处理优化class BatchProcessor: """批处理器""" def __init__(self, batch_size: int = 100, timeout: float = 1.0): self.batch_size = batch_size self.timeout = timeout self.buffer: List = [] self.lock = asyncio.Lock() async def add(self, item: any): """添加项目到批处理""" async with self.lock: self.buffer.append(item) if len(self.buffer) >= self.batch_size: await self._flush() async def _flush(self): """刷新缓冲区""" if not self.buffer: return batch = self.buffer.copy() self.buffer.clear() # 处理批次 await self._process_batch(batch) async def _process_batch(self, batch: List): """处理批次""" # 实现具体的批处理逻辑 pass async def start_periodic_flush(self): """启动定期刷新""" while True: await asyncio.sleep(self.timeout) async with self.lock: await self._flush()5. 性能报告生成器from typing import Dict, Listfrom datetime import datetime, timedeltaclass PerformanceReportGenerator: """性能报告生成器""" def __init__(self, metrics_collector: MetricsCollector): self.metrics_collector = metrics_collector def generate_report( self, start_time: datetime, end_time: datetime ) -> Dict: """生成性能报告""" report = { "period": { "start": start_time, "end": end_time }, "summary": {}, "metrics": {}, "alerts": [], "recommendations": [] } # 生成摘要 report["summary"] = self._generate_summary(start_time, end_time) # 生成指标详情 report["metrics"] = self._generate_metrics_detail( start_time, end_time ) # 生成优化建议 report["recommendations"] = self._generate_recommendations( report["metrics"] ) return report def _generate_summary( self, start_time: datetime, end_time: datetime ) -> Dict: """生成摘要""" summary = { "total_requests": 0, "successful_requests": 0, "failed_requests": 0, "average_response_time": 0.0, "p95_response_time": 0.0, "p99_response_time": 0.0 } # 获取请求指标 success_metrics = self.metrics_collector.get_metrics( "request_success", since=start_time ) error_metrics = self.metrics_collector.get_metrics( "request_error", since=start_time ) duration_metrics = self.metrics_collector.get_metrics( "request_duration", since=start_time ) summary["successful_requests"] = len(success_metrics) summary["failed_requests"] = len(error_metrics) summary["total_requests"] = ( summary["successful_requests"] + summary["failed_requests"] ) if duration_metrics: durations = [m.value for m in duration_metrics] summary["average_response_time"] = sum(durations) / len(durations) summary["p95_response_time"] = self._percentile(durations, 95) summary["p99_response_time"] = self._percentile(durations, 99) return summary def _generate_metrics_detail( self, start_time: datetime, end_time: datetime ) -> Dict: """生成指标详情""" metrics_detail = {} # 获取所有指标名称 metric_names = set(m.name for m in self.metrics_collector.metrics) for metric_name in metric_names: metrics = self.metrics_collector.get_metrics( metric_name, since=start_time ) if not metrics: continue values = [m.value for m in metrics] metrics_detail[metric_name] = { "count": len(values), "sum": sum(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values), "p50": self._percentile(values, 50), "p90": self._percentile(values, 90), "p95": self._percentile(values, 95), "p99": self._percentile(values, 99), } return metrics_detail def _generate_recommendations( self, metrics_detail: Dict ) -> List[str]: """生成优化建议""" recommendations = [] # 检查响应时间 if "request_duration" in metrics_detail: avg_response_time = metrics_detail["request_duration"]["avg"] p95_response_time = metrics_detail["request_duration"]["p95"] if avg_response_time > 1.0: recommendations.append( "平均响应时间超过 1 秒,建议优化慢查询或增加缓存" ) if p95_response_time > 5.0: recommendations.append( "P95 响应时间超过 5 秒,建议检查性能瓶颈" ) # 检查错误率 if "request_error" in metrics_detail: error_count = metrics_detail["request_error"]["count"] if error_count > 100: recommendations.append( "错误数量较多,建议检查错误日志并修复问题" ) return recommendations def _percentile(self, sorted_values: List[float], percentile: int) -> float: """计算百分位数""" if not sorted_values: return 0.0 sorted_values = sorted(sorted_values) index = int(len(sorted_values) * percentile / 100) return sorted_values[min(index, len(sorted_values) - 1)]最佳实践:全面监控:监控所有关键指标,包括请求、响应时间、错误率等实时告警:设置合理的告警阈值,及时发现性能问题性能分析:定期分析性能数据,识别优化机会缓存优化:合理使用缓存减少重复计算和查询连接池:使用连接池优化数据库和网络连接批处理:使用批处理提高吞吐量通过完善的性能监控和优化,可以确保 MCP 系统高效稳定运行。
阅读 0·2月19日 21:43

MCP 的版本管理和兼容性如何处理?

MCP 的版本管理和兼容性对于确保系统的稳定性和可维护性至关重要。以下是详细的版本管理策略和兼容性处理方法:版本管理策略MCP 版本管理应考虑以下方面:语义化版本控制:使用语义化版本号(SemVer)向后兼容性:确保新版本向后兼容旧版本废弃策略:明确的功能废弃和移除流程迁移指南:提供详细的版本迁移指南版本协商:客户端和服务器的版本协商机制1. 语义化版本控制from dataclasses import dataclassfrom typing import Optionalimport re@dataclassclass Version: """版本号""" major: int minor: int patch: int prerelease: Optional[str] = None build_metadata: Optional[str] = None def __str__(self) -> str: version_str = f"{self.major}.{self.minor}.{self.patch}" if self.prerelease: version_str += f"-{self.prerelease}" if self.build_metadata: version_str += f"+{self.build_metadata}" return version_str @classmethod def parse(cls, version_str: str) -> 'Version': """解析版本字符串""" # 匹配语义化版本格式 pattern = r'^(\d+)\.(\d+)\.(\d+)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?$' match = re.match(pattern, version_str) if not match: raise ValueError(f"无效的版本格式: {version_str}") major = int(match.group(1)) minor = int(match.group(2)) patch = int(match.group(3)) prerelease = match.group(4) build_metadata = match.group(5) return cls( major=major, minor=minor, patch=patch, prerelease=prerelease, build_metadata=build_metadata ) def is_compatible(self, other: 'Version') -> bool: """检查版本兼容性""" # 主版本号相同,次版本号向后兼容 if self.major != other.major: return False # 如果当前版本 >= 其他版本,则兼容 if (self.minor, self.patch) >= (other.minor, other.patch): return True return False def __lt__(self, other: 'Version') -> bool: """版本比较""" if self.major != other.major: return self.major < other.major if self.minor != other.minor: return self.minor < other.minor if self.patch != other.patch: return self.patch < other.patch # 预发布版本比较 if self.prerelease and not other.prerelease: return True if not self.prerelease and other.prerelease: return False if self.prerelease and other.prerelease: return self.prerelease < other.prerelease return False def __eq__(self, other: 'Version') -> bool: """版本相等""" return ( self.major == other.major and self.minor == other.minor and self.patch == other.patch and self.prerelease == other.prerelease )# 当前 MCP 版本MCP_VERSION = Version(1, 0, 0)2. 版本协商机制from typing import Dict, List, Optionalclass VersionNegotiator: """版本协商器""" def __init__(self, server_version: Version): self.server_version = server_version self.supported_versions: List[Version] = [ Version(1, 0, 0), Version(0, 9, 0), ] def negotiate_version( self, client_versions: List[Version] ) -> Optional[Version]: """协商最佳版本""" # 找到客户端支持的最高版本 client_versions_sorted = sorted(client_versions, reverse=True) for client_version in client_versions_sorted: # 检查服务器是否支持此版本 for server_version in self.supported_versions: if server_version.is_compatible(client_version): return server_version # 没有找到兼容版本 return None def get_server_info(self) -> Dict: """获取服务器版本信息""" return { "version": str(self.server_version), "supported_versions": [ str(v) for v in self.supported_versions ] }class MCPClient: """MCP 客户端""" def __init__(self, supported_versions: List[Version]): self.supported_versions = supported_versions self.negotiated_version: Optional[Version] = None async def connect(self, server_info: Dict) -> bool: """连接到服务器并协商版本""" server_version = Version.parse(server_info["version"]) server_supported_versions = [ Version.parse(v) for v in server_info["supported_versions"] ] # 创建临时协商器 negotiator = VersionNegotiator(server_version) # 协商版本 self.negotiated_version = negotiator.negotiate_version( self.supported_versions ) if not self.negotiated_version: print("无法协商兼容的版本") return False print(f"协商版本: {self.negotiated_version}") return True3. 功能废弃管理from enum import Enumfrom typing import Dict, List, Optionalfrom datetime import datetimeclass DeprecationStatus(Enum): """废弃状态""" STABLE = "stable" DEPRECATED = "deprecated" REMOVED = "removed"@dataclassclass DeprecationInfo: """废弃信息""" status: DeprecationStatus deprecated_in: Optional[Version] = None removed_in: Optional[Version] = None replacement: Optional[str] = None message: Optional[str] = Noneclass DeprecationManager: """废弃管理器""" def __init__(self): self.deprecations: Dict[str, DeprecationInfo] = {} def deprecate_feature( self, feature_name: str, deprecated_in: Version, removed_in: Version, replacement: str = None, message: str = None ): """标记功能为废弃""" self.deprecations[feature_name] = DeprecationInfo( status=DeprecationStatus.DEPRECATED, deprecated_in=deprecated_in, removed_in=removed_in, replacement=replacement, message=message ) def remove_feature(self, feature_name: str, removed_in: Version): """移除功能""" if feature_name in self.deprecations: self.deprecations[feature_name].status = DeprecationStatus.REMOVED self.deprecations[feature_name].removed_in = removed_in def check_feature( self, feature_name: str, current_version: Version ) -> tuple: """检查功能状态""" if feature_name not in self.deprecations: return True, None info = self.deprecations[feature_name] if info.status == DeprecationStatus.REMOVED: return False, f"功能 {feature_name} 已在版本 {info.removed_in} 中移除" if info.status == DeprecationStatus.DEPRECATED: if current_version >= info.removed_in: return False, f"功能 {feature_name} 已在版本 {info.removed_in} 中移除" warning = f"功能 {feature_name} 已废弃,将在版本 {info.removed_in} 中移除" if info.replacement: warning += f",请使用 {info.replacement}" return True, warning return True, None def get_deprecation_warnings( self, current_version: Version ) -> List[str]: """获取所有废弃警告""" warnings = [] for feature_name, info in self.deprecations.items(): if info.status == DeprecationStatus.DEPRECATED: if current_version < info.removed_in: warning = f"功能 {feature_name} 已废弃" if info.replacement: warning += f",请使用 {info.replacement}" warnings.append(warning) return warnings4. 版本迁移指南from typing import Dict, List, Callableclass MigrationGuide: """版本迁移指南""" def __init__(self): self.migrations: Dict[Version, List[Migration]] = {} def add_migration( self, from_version: Version, to_version: Version, migration_func: Callable, description: str ): """添加迁移步骤""" if from_version not in self.migrations: self.migrations[from_version] = [] migration = Migration( from_version=from_version, to_version=to_version, func=migration_func, description=description ) self.migrations[from_version].append(migration) def get_migration_path( self, from_version: Version, to_version: Version ) -> List[Migration]: """获取迁移路径""" if from_version == to_version: return [] # 简单实现:直接迁移 if from_version in self.migrations: for migration in self.migrations[from_version]: if migration.to_version == to_version: return [migration] # 复杂实现:查找多步迁移路径 return self._find_migration_path(from_version, to_version) def _find_migration_path( self, from_version: Version, to_version: Version, visited: set = None ) -> List[Migration]: """递归查找迁移路径""" if visited is None: visited = set() if from_version in visited: return [] visited.add(from_version) if from_version == to_version: return [] if from_version not in self.migrations: return [] for migration in self.migrations[from_version]: path = self._find_migration_path( migration.to_version, to_version, visited.copy() ) if path is not None: return [migration] + path return [] async def execute_migration( self, from_version: Version, to_version: Version, context: Dict ) -> bool: """执行迁移""" migration_path = self.get_migration_path(from_version, to_version) if not migration_path: print(f"无法找到从 {from_version} 到 {to_version} 的迁移路径") return False print(f"开始迁移: {from_version} -> {to_version}") for migration in migration_path: print(f"执行迁移: {migration.description}") try: await migration.func(context) print(f"迁移完成: {migration.description}") except Exception as e: print(f"迁移失败: {migration.description}, 错误: {e}") return False print(f"迁移完成: {from_version} -> {to_version}") return True@dataclassclass Migration: """迁移步骤""" from_version: Version to_version: Version func: Callable description: str5. 版本兼容性检查from typing import Dict, List, Tupleclass CompatibilityChecker: """兼容性检查器""" def __init__(self, current_version: Version): self.current_version = current_version self.compatibility_matrix: Dict[Version, List[Version]] = {} def add_compatibility( self, server_version: Version, compatible_client_versions: List[Version] ): """添加兼容性规则""" self.compatibility_matrix[server_version] = compatible_client_versions def check_compatibility( self, client_version: Version ) -> Tuple[bool, Optional[str]]: """检查客户端版本是否兼容""" # 检查服务器是否支持此版本 if self.current_version in self.compatibility_matrix: compatible_versions = self.compatibility_matrix[self.current_version] for compatible_version in compatible_versions: if client_version.is_compatible(compatible_version): return True, None # 使用默认兼容性规则 if self.current_version.is_compatible(client_version): return True, None return False, f"客户端版本 {client_version} 与服务器版本 {self.current_version} 不兼容" def get_compatible_versions(self) -> List[Version]: """获取所有兼容的客户端版本""" compatible_versions = [] if self.current_version in self.compatibility_matrix: compatible_versions = self.compatibility_matrix[self.current_version] return compatible_versions6. 版本信息 APIfrom fastapi import FastAPIfrom typing import Dictclass VersionInfoAPI: """版本信息 API""" def __init__( self, current_version: Version, supported_versions: List[Version], deprecation_manager: DeprecationManager, migration_guide: MigrationGuide ): self.current_version = current_version self.supported_versions = supported_versions self.deprecation_manager = deprecation_manager self.migration_guide = migration_guide def setup_routes(self, app: FastAPI): """设置路由""" @app.get("/version") async def get_version() -> Dict: """获取当前版本信息""" return { "version": str(self.current_version), "supported_versions": [str(v) for v in self.supported_versions], "deprecation_warnings": self.deprecation_manager.get_deprecation_warnings( self.current_version ) } @app.get("/version/compatibility/{client_version}") async def check_compatibility(client_version: str) -> Dict: """检查客户端版本兼容性""" checker = CompatibilityChecker(self.current_version) try: version = Version.parse(client_version) except ValueError: return { "compatible": False, "message": "无效的版本格式" } compatible, message = checker.check_compatibility(version) return { "compatible": compatible, "message": message, "server_version": str(self.current_version) } @app.get("/version/migration/{from_version}/{to_version}") async def get_migration_path( from_version: str, to_version: str ) -> Dict: """获取迁移路径""" try: from_ver = Version.parse(from_version) to_ver = Version.parse(to_version) except ValueError: return { "success": False, "message": "无效的版本格式" } path = self.migration_guide.get_migration_path(from_ver, to_ver) return { "success": True, "migration_path": [ { "from": str(m.from_version), "to": str(m.to_version), "description": m.description } for m in path ] }最佳实践:语义化版本:严格遵循语义化版本控制规范向后兼容:确保新版本向后兼容旧版本渐进废弃:提前通知功能废弃,给用户迁移时间文档完善:提供详细的版本迁移指南版本协商:实现客户端和服务器的版本协商机制测试覆盖:为每个版本编写兼容性测试通过完善的版本管理和兼容性处理,可以确保 MCP 系统的稳定性和可维护性。
阅读 0·2月19日 21:43

MCP 的未来发展趋势是什么?有哪些挑战和机遇?

MCP 作为新兴的 AI 集成协议,具有广阔的发展前景和潜力。以下是 MCP 的未来发展趋势:1. 标准化推进行业认可:获得更多 AI 模型提供商和企业的认可协议完善:持续完善协议规范,解决现有局限性标准化组织:可能提交给标准化组织(如 W3C、IETF)进行标准化互操作性增强:与现有协议(如 OpenAPI、GraphQL)的互操作性2. 生态系统扩展更多语言支持:扩展到 Rust、Java、C#、PHP 等更多编程语言服务器生态:社区贡献更多针对特定领域的 MCP 服务器客户端集成:更多 AI 应用和平台原生支持 MCP工具库丰富:提供更多预构建的工具和资源3. 性能优化协议优化:引入二进制协议、压缩、批量操作等优化异步增强:更强大的异步和流式处理能力缓存机制:智能缓存策略减少重复计算边缘计算:支持边缘节点部署,降低延迟4. 安全性增强高级认证:支持 OAuth 2.0、SAML 等企业级认证细粒度权限:更精细的访问控制和权限管理安全审计:完整的安全审计和合规支持加密增强:端到端加密和密钥管理5. 功能扩展实时通信:支持 WebSocket 等实时双向通信流式处理:更好的流式数据处理能力事件驱动:支持事件订阅和推送机制多模态支持:增强对图像、音频、视频等多模态数据的支持6. 企业级特性多租户支持:完善的多租户隔离和管理高可用性:内置高可用和灾难恢复机制可观测性:完整的监控、日志和追踪能力治理工具:提供企业级治理和管理工具7. AI 模型集成更多模型支持:支持更多开源和商业 AI 模型模型适配器:提供模型适配器简化集成性能优化:针对不同模型的性能优化成本控制:智能的成本控制和优化8. 开发者体验更好的工具:更强大的开发、测试和调试工具文档完善:更全面和易懂的文档示例丰富:更多实际应用场景的示例社区支持:活跃的社区支持和交流9. 应用场景拓展企业应用:更多企业级应用场景物联网:IoT 设备和系统的集成边缘 AI:边缘计算和 AI 的结合自动化:更广泛的自动化应用10. 挑战和机遇挑战:与现有协议的竞争和兼容性社区建设和生态发展性能和可扩展性的平衡安全性和易用性的权衡机遇:成为 AI 集成的行业标准推动AI应用的大规模部署促进AI技术的民主化创造新的商业模式和机会预测:未来 2-3 年内,MCP 有望成为 AI 模型与外部系统集成的主流标准之一,被广泛采用于企业级应用、开发工具和各种 AI 产品中。其开放性和标准化特性将推动整个 AI 生态系统的发展。开发者现在学习和采用 MCP,将能够在未来的 AI 应用开发中占据有利位置。
阅读 0·2月19日 21:41

MCP 如何与其他 AI 框架(如 LangChain、LlamaIndex)集成?

MCP 可以与各种 AI 框架和工具集成,扩展其功能和应用场景。以下是详细的集成方法和最佳实践:集成架构设计MCP 集成应考虑以下方面:框架兼容性:确保与目标框架的兼容性性能影响:最小化对系统性能的影响功能完整性:保持 MCP 和框架功能的完整性错误处理:正确处理集成过程中的错误配置管理:统一管理集成配置1. 与 LangChain 集成from langchain.agents import AgentExecutor, create_openai_tools_agentfrom langchain_openai import ChatOpenAIfrom langchain.tools import Toolfrom mcp.server import Serverclass MCPLangChainIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.langchain_tools = [] async def convert_mcp_to_langchain_tools(self) -> list: """将 MCP 工具转换为 LangChain 工具""" mcp_tools = await self.mcp_server.list_tools() langchain_tools = [] for tool_info in mcp_tools: tool = Tool( name=tool_info["name"], description=tool_info["description"], func=self._create_tool_wrapper(tool_info["name"]) ) langchain_tools.append(tool) return langchain_tools def _create_tool_wrapper(self, tool_name: str): """创建工具包装器""" async def wrapper(**kwargs): result = await self.mcp_server.call_tool( tool_name, kwargs ) return result return wrapper async def create_langchain_agent(self): """创建 LangChain Agent""" # 转换 MCP 工具 tools = await self.convert_mcp_to_langchain_tools() # 创建 LLM llm = ChatOpenAI( model="gpt-4", temperature=0 ) # 创建 Agent agent = create_openai_tools_agent(llm, tools) # 创建 AgentExecutor agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True ) return agent_executor async def run_agent(self, query: str): """运行 Agent""" agent_executor = await self.create_langchain_agent() result = await agent_executor.ainvoke({ "input": query }) return result["output"]2. 与 LlamaIndex 集成from llama_index.core import VectorStoreIndex, SimpleDirectoryReaderfrom llama_index.core.tools import QueryEngineTool, ToolMetadatafrom llama_index.core.agent import ReActAgentfrom mcp.server import Serverclass MCPLlamaIndexIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server async def create_mcp_query_engine(self, tool_name: str): """创建 MCP 查询引擎""" async def query_engine_fn(query: str) -> str: result = await self.mcp_server.call_tool( tool_name, {"query": query} ) return result return query_engine_fn async def create_llamaindex_tools(self) -> list: """创建 LlamaIndex 工具""" mcp_tools = await self.mcp_server.list_tools() tools = [] for tool_info in mcp_tools: query_engine = await self.create_mcp_query_engine( tool_info["name"] ) tool = QueryEngineTool( query_engine=query_engine, metadata=ToolMetadata( name=tool_info["name"], description=tool_info["description"] ) ) tools.append(tool) return tools async def create_react_agent(self): """创建 ReAct Agent""" tools = await self.create_llamaindex_tools() agent = ReActAgent.from_tools( tools=tools, verbose=True ) return agent async def query_with_agent(self, query: str): """使用 Agent 查询""" agent = await self.create_react_agent() response = agent.query(query) return response.response3. 与 AutoGPT 集成from autogpt.agent.agent import Agentfrom autogpt.config import Configfrom autogpt.models.command import Commandfrom mcp.server import Serverclass MCPAutoGPTIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.command_registry = {} async def register_mcp_commands(self): """注册 MCP 命令""" mcp_tools = await self.mcp_server.list_tools() for tool_info in mcp_tools: command = Command( name=tool_info["name"], description=tool_info["description"], function=self._create_command_function(tool_info["name"]) ) self.command_registry[tool_info["name"]] = command def _create_command_function(self, tool_name: str): """创建命令函数""" async def command_function(**kwargs): result = await self.mcp_server.call_tool( tool_name, kwargs ) return result return command_function async def create_autogpt_agent(self, config: Config): """创建 AutoGPT Agent""" # 注册 MCP 命令 await self.register_mcp_commands() # 创建 Agent agent = Agent( ai_name="MCP-Agent", ai_role="Assistant", commands=self.command_registry, config=config ) return agent async def run_autogpt_task(self, task: str): """运行 AutoGPT 任务""" config = Config() agent = await self.create_autogpt_agent(config) result = await agent.run(task) return result4. 与 FastAPI 集成from fastapi import FastAPI, HTTPException, Dependsfrom fastapi.middleware.cors import CORSMiddlewarefrom mcp.server import Serverfrom pydantic import BaseModelclass MCPFastAPIIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.app = FastAPI(title="MCP API") self._setup_middleware() self._setup_routes() def _setup_middleware(self): """设置中间件""" self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def _setup_routes(self): """设置路由""" @self.app.get("/tools") async def list_tools(): """列出所有工具""" tools = await self.mcp_server.list_tools() return {"tools": tools} @self.app.post("/tools/{tool_name}/call") async def call_tool(tool_name: str, params: dict): """调用工具""" try: result = await self.mcp_server.call_tool( tool_name, params ) return {"result": result} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/resources") async def list_resources(): """列出所有资源""" resources = await self.mcp_server.list_resources() return {"resources": resources} @self.app.get("/resources/{uri}") async def read_resource(uri: str): """读取资源""" try: content = await self.mcp_server.read_resource(uri) return {"content": content} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/prompts") async def list_prompts(): """列出所有提示词""" prompts = await self.mcp_server.list_prompts() return {"prompts": prompts} @self.app.get("/health") async def health_check(): """健康检查""" return {"status": "healthy"} def get_app(self) -> FastAPI: """获取 FastAPI 应用""" return self.app5. 与 WebSocket 集成import asyncioimport jsonfrom fastapi import WebSocketfrom mcp.server import Serverclass MCPWebSocketIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.active_connections = [] async def handle_websocket(self, websocket: WebSocket): """处理 WebSocket 连接""" await websocket.accept() self.active_connections.append(websocket) try: while True: # 接收消息 data = await websocket.receive_text() message = json.loads(data) # 处理消息 response = await self._handle_message(message) # 发送响应 await websocket.send_text(json.dumps(response)) except Exception as e: print(f"WebSocket 错误: {e}") finally: self.active_connections.remove(websocket) async def _handle_message(self, message: dict) -> dict: """处理消息""" message_type = message.get("type") if message_type == "list_tools": tools = await self.mcp_server.list_tools() return {"type": "tools_list", "data": tools} elif message_type == "call_tool": result = await self.mcp_server.call_tool( message["tool_name"], message.get("params", {}) ) return {"type": "tool_result", "data": result} elif message_type == "list_resources": resources = await self.mcp_server.list_resources() return {"type": "resources_list", "data": resources} else: return {"type": "error", "message": "未知消息类型"} async def broadcast_message(self, message: dict): """广播消息到所有连接""" message_text = json.dumps(message) for connection in self.active_connections: try: await connection.send_text(message_text) except Exception as e: print(f"发送消息失败: {e}")6. 与 GraphQL 集成import strawberryfrom strawberry.types import Infofrom mcp.server import Server@strawberry.typeclass MCPTool: name: str description: str@strawberry.typeclass MCPResource: uri: str name: str description: str@strawberry.typeclass Query: @strawberry.field async def tools(self, info: Info) -> list[MCPTool]: """获取所有工具""" mcp_server = info.context["mcp_server"] tools = await mcp_server.list_tools() return [ MCPTool( name=tool["name"], description=tool["description"] ) for tool in tools ] @strawberry.field async def resources(self, info: Info) -> list[MCPResource]: """获取所有资源""" mcp_server = info.context["mcp_server"] resources = await mcp_server.list_resources() return [ MCPResource( uri=resource["uri"], name=resource["name"], description=resource["description"] ) for resource in resources ]@strawberry.typeclass Mutation: @strawberry.mutation async def call_tool( self, tool_name: str, params: dict, info: Info ) -> str: """调用工具""" mcp_server = info.context["mcp_server"] result = await mcp_server.call_tool(tool_name, params) return str(result)class MCPGraphQLIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.schema = strawberry.Schema( query=Query, mutation=Mutation ) async def execute_query(self, query: str, variables: dict = None): """执行 GraphQL 查询""" context = {"mcp_server": self.mcp_server} result = await self.schema.execute( query, variable_values=variables, context_value=context ) if result.errors: return { "errors": [str(error) for error in result.errors] } return {"data": result.data}7. 与 gRPC 集成import grpcfrom concurrent import futuresimport mcp_pb2import mcp_pb2_grpcfrom mcp.server import Serverclass MCPServicer(mcp_pb2_grpc.MCPServicer): def __init__(self, mcp_server: Server): self.mcp_server = mcp_server async def ListTools( self, request: mcp_pb2.ListToolsRequest, context: grpc.ServicerContext ) -> mcp_pb2.ListToolsResponse: """列出工具""" tools = await self.mcp_server.list_tools() tool_protos = [ mcp_pb2.Tool( name=tool["name"], description=tool["description"] ) for tool in tools ] return mcp_pb2.ListToolsResponse(tools=tool_protos) async def CallTool( self, request: mcp_pb2.CallToolRequest, context: grpc.ServicerContext ) -> mcp_pb2.CallToolResponse: """调用工具""" params = dict(request.params) result = await self.mcp_server.call_tool( request.tool_name, params ) return mcp_pb2.CallToolResponse(result=str(result))class MCPGRPCIntegration: def __init__(self, mcp_server: Server, port: int = 50051): self.mcp_server = mcp_server self.port = port self.server = None async def start_server(self): """启动 gRPC 服务器""" self.server = grpc.aio.server( futures.ThreadPoolExecutor(max_workers=10) ) mcp_pb2_grpc.add_MCPServicer_to_server( MCPServicer(self.mcp_server), self.server ) self.server.add_insecure_port(f'[::]:{self.port}') await self.server.start() print(f"gRPC 服务器启动在端口 {self.port}") async def stop_server(self): """停止 gRPC 服务器""" if self.server: await self.server.stop(0) print("gRPC 服务器已停止")最佳实践:异步处理:使用异步编程避免阻塞错误处理:正确处理集成过程中的错误性能优化:缓存频繁调用的结果日志记录:记录所有集成操作测试覆盖:编写集成测试确保功能正常文档完善:提供清晰的集成文档通过与其他 AI 框架和工具的集成,可以扩展 MCP 的功能和应用场景。
阅读 0·2月19日 21:40

MCP 的安全性设计有哪些关键机制?

MCP 的安全性设计包含多个层面,确保 AI 模型与外部系统的交互是安全可控的:1. 认证和授权机制身份认证:支持多种认证方式(API Key、OAuth、JWT 等)访问控制:基于角色的权限管理(RBAC)令牌管理:安全的令牌生成、验证和刷新机制多租户支持:隔离不同用户或租户的数据和资源2. 通信安全加密传输:强制使用 TLS/SSL 加密所有通信证书验证:严格的证书验证和吊销检查安全协议:基于 JSON-RPC 2.0 的安全扩展防止中间人攻击:完整的证书链验证3. 输入验证和清理参数验证:严格验证所有输入参数的类型和格式SQL 注入防护:使用参数化查询,防止 SQL 注入XSS 防护:清理和转义用户输入,防止跨站脚本攻击命令注入防护:限制和验证系统命令执行4. 资源访问控制文件系统隔离:限制可访问的文件路径和权限网络访问限制:白名单机制控制外部网络访问资源配额:限制 CPU、内存、磁盘等资源使用操作审计:记录所有资源访问和修改操作5. 执行环境安全沙箱隔离:在隔离的沙箱环境中执行代码权限最小化:只授予必要的最小权限超时控制:设置执行超时,防止无限循环资源限制:限制内存、CPU 等资源使用6. 错误处理和日志安全错误消息:不泄露敏感信息的错误提示详细日志记录:记录所有操作和安全事件审计追踪:完整的操作审计链异常监控:实时监控异常行为7. 数据保护数据加密:敏感数据加密存储和传输数据脱敏:日志和错误消息中的敏感数据脱敏数据隔离:不同用户数据的严格隔离数据备份:安全的数据备份和恢复机制8. 速率限制和防护请求限流:防止 API 滥用和 DDoS 攻击并发控制:限制并发请求数量黑名单机制:阻止恶意 IP 或用户异常检测:检测和阻止异常行为模式安全最佳实践:定期进行安全审计和渗透测试及时更新依赖库和框架实施最小权限原则建立安全事件响应流程提供安全配置指南和文档通过这些多层安全机制,MCP 能够在提供强大功能的同时,确保系统的安全性和可靠性。
阅读 0·2月19日 21:40

如何对 MCP 进行测试?有哪些测试策略和最佳实践?

MCP 的测试策略对于确保系统质量和可靠性至关重要。以下是详细的测试方法和最佳实践:测试层次结构MCP 测试应涵盖以下层次:单元测试:测试单个函数和组件集成测试:测试组件之间的交互端到端测试:测试完整的用户场景性能测试:测试系统性能和可扩展性安全测试:测试安全漏洞和防护机制1. 单元测试import pytestfrom unittest.mock import Mock, AsyncMockfrom mcp.server import Serverclass TestMCPTools: @pytest.fixture def server(self): """创建测试服务器实例""" return Server("test-server") @pytest.mark.asyncio async def test_tool_registration(self, server): """测试工具注册""" @server.tool( name="test_tool", description="测试工具" ) async def test_tool(param: str) -> str: return f"Result: {param}" # 验证工具已注册 tools = await server.list_tools() assert any(t["name"] == "test_tool" for t in tools) @pytest.mark.asyncio async def test_tool_execution(self, server): """测试工具执行""" @server.tool( name="calculate", description="计算工具" ) async def calculate(a: int, b: int) -> int: return a + b # 执行工具 result = await server.call_tool("calculate", {"a": 2, "b": 3}) assert result == 5 @pytest.mark.asyncio async def test_parameter_validation(self, server): """测试参数验证""" @server.tool( name="validate", description="参数验证工具", inputSchema={ "type": "object", "properties": { "email": { "type": "string", "format": "email" } }, "required": ["email"] } ) async def validate(email: str) -> str: return f"Valid: {email}" # 测试有效参数 result = await server.call_tool( "validate", {"email": "test@example.com"} ) assert "Valid" in result # 测试无效参数 with pytest.raises(ValueError): await server.call_tool("validate", {"email": "invalid"})2. 集成测试class TestMCPIntegration: @pytest.mark.asyncio async def test_client_server_communication(self): """测试客户端-服务器通信""" from mcp.client import Client from mcp.server import Server # 创建服务器 server = Server("integration-test-server") @server.tool(name="echo", description="回显工具") async def echo(message: str) -> str: return message # 启动服务器 await server.start() try: # 创建客户端 client = Client("http://localhost:8000") # 测试通信 result = await client.call_tool("echo", {"message": "Hello"}) assert result == "Hello" finally: await server.stop() @pytest.mark.asyncio async def test_resource_access(self): """测试资源访问""" server = Server("resource-test-server") @server.resource( uri="file:///test.txt", name="测试文件", description="测试资源" ) async def test_resource() -> str: return "Test content" await server.start() try: client = Client("http://localhost:8000") # 读取资源 content = await client.read_resource("file:///test.txt") assert content == "Test content" finally: await server.stop()3. 端到端测试class TestMCPEndToEnd: @pytest.mark.asyncio async def test_complete_workflow(self): """测试完整工作流""" # 模拟用户场景:查询数据库并生成报告 server = Server("e2e-test-server") @server.tool(name="query_db", description="查询数据库") async def query_db(query: str) -> list: return [{"id": 1, "name": "Test"}] @server.tool(name="generate_report", description="生成报告") async def generate_report(data: list) -> str: return f"Report: {len(data)} items" await server.start() try: client = Client("http://localhost:8000") # 执行工作流 data = await client.call_tool("query_db", {"query": "SELECT *"}) report = await client.call_tool("generate_report", {"data": data}) assert "1 items" in report finally: await server.stop()4. 性能测试import asyncioimport timefrom locust import HttpUser, task, betweenclass MCPPerformanceTest(HttpUser): wait_time = between(1, 3) def on_start(self): """测试开始时的初始化""" self.client = Client(self.host) @task def tool_call_performance(self): """测试工具调用性能""" start_time = time.time() result = self.client.call_tool("test_tool", {"param": "value"}) elapsed = time.time() - start_time # 断言响应时间 assert elapsed < 1.0, f"响应时间过长: {elapsed}s" @task def concurrent_requests(self): """测试并发请求""" async def make_request(): return self.client.call_tool("test_tool", {"param": "value"}) # 并发执行 10 个请求 tasks = [make_request() for _ in range(10)] results = asyncio.run(asyncio.gather(*tasks)) # 验证所有请求都成功 assert all(results)5. 安全测试class TestMCPSecurity: @pytest.mark.asyncio async def test_authentication(self): """测试认证机制""" server = Server("security-test-server") # 配置认证 server.set_authenticator(lambda token: token == "valid-token") @server.tool(name="secure_tool", description="安全工具") async def secure_tool() -> str: return "Secure data" await server.start() try: # 测试有效令牌 client = Client("http://localhost:8000", token="valid-token") result = await client.call_tool("secure_tool", {}) assert result == "Secure data" # 测试无效令牌 client_invalid = Client("http://localhost:8000", token="invalid-token") with pytest.raises(AuthenticationError): await client_invalid.call_tool("secure_tool", {}) finally: await server.stop() @pytest.mark.asyncio async def test_sql_injection_prevention(self): """测试 SQL 注入防护""" server = Server("sql-injection-test-server") @server.tool(name="query", description="查询工具") async def query(sql: str) -> list: # 应该使用参数化查询 return execute_safe_query(sql) # 测试 SQL 注入尝试 malicious_sql = "SELECT * FROM users WHERE '1'='1'" result = await server.call_tool("query", {"sql": malicious_sql}) # 验证注入被阻止 assert result == [] @pytest.mark.asyncio async def test_rate_limiting(self): """测试速率限制""" server = Server("rate-limit-test-server") # 配置速率限制 server.set_rate_limit(max_requests=10, window=60) @server.tool(name="limited_tool", description="受限工具") async def limited_tool() -> str: return "Success" # 快速发送多个请求 for i in range(15): try: await server.call_tool("limited_tool", {}) except RateLimitError: # 预期的速率限制错误 assert i >= 10 break else: pytest.fail("未触发速率限制")6. Mock 和 Stubfrom unittest.mock import Mock, patchclass TestMCPWithMocks: @pytest.mark.asyncio async def test_with_external_dependency_mock(self): """使用 Mock 测试外部依赖""" server = Server("mock-test-server") @server.tool(name="fetch_data", description="获取数据") async def fetch_data(url: str) -> dict: # Mock 外部 API 调用 with patch('requests.get') as mock_get: mock_get.return_value.json.return_value = { "data": "mocked" } response = requests.get(url) return response.json() result = await server.call_tool( "fetch_data", {"url": "http://api.example.com"} ) assert result == {"data": "mocked"} mock_get.assert_called_once()7. 测试覆盖率# 使用 pytest-cov 生成覆盖率报告# 运行命令: pytest --cov=mcp --cov-report=htmlclass TestCoverage: @pytest.mark.asyncio async def test_all_code_paths(self): """测试所有代码路径""" server = Server("coverage-test-server") @server.tool(name="complex_tool", description="复杂工具") async def complex_tool(condition: bool) -> str: if condition: return "Branch A" else: return "Branch B" # 测试所有分支 result_a = await server.call_tool("complex_tool", {"condition": True}) assert result_a == "Branch A" result_b = await server.call_tool("complex_tool", {"condition": False}) assert result_b == "Branch B"最佳实践:测试金字塔:大量单元测试,适量集成测试,少量端到端测试独立性:每个测试应该独立运行,不依赖其他测试可重复性:测试结果应该可重复,不受环境因素影响快速反馈:单元测试应该快速执行,提供快速反馈持续集成:将测试集成到 CI/CD 流程中覆盖率目标:设定合理的代码覆盖率目标(如 80%)通过完善的测试策略,可以确保 MCP 系统的质量和可靠性。
阅读 0·2月19日 21:40

MCP 的插件系统是如何工作的?

MCP 的插件系统允许开发者扩展 MCP 服务器的功能,无需修改核心代码。以下是详细的插件架构和实现方法:插件架构设计MCP 插件系统应考虑以下方面:插件发现:自动发现和加载插件插件生命周期:管理插件的加载、初始化、卸载插件隔离:确保插件之间相互隔离插件通信:提供插件间的通信机制插件安全:限制插件的权限和资源访问1. 插件接口定义from abc import ABC, abstractmethodfrom typing import Dict, Any, Listclass MCPPlugin(ABC): """MCP 插件基类""" def __init__(self, config: Dict[str, Any] = None): self.config = config or {} self.name = self.__class__.__name__ self.version = getattr(self, 'VERSION', '1.0.0') @abstractmethod async def initialize(self, server): """初始化插件""" pass @abstractmethod async def shutdown(self): """关闭插件""" pass @abstractmethod def get_tools(self) -> List[Dict[str, Any]]: """获取插件提供的工具""" return [] @abstractmethod def get_resources(self) -> List[Dict[str, Any]]: """获取插件提供的资源""" return [] @abstractmethod def get_prompts(self) -> List[Dict[str, Any]]: """获取插件提供的提示词""" return [] def get_metadata(self) -> Dict[str, Any]: """获取插件元数据""" return { "name": self.name, "version": self.version, "description": getattr(self, 'DESCRIPTION', ''), "author": getattr(self, 'AUTHOR', ''), "dependencies": getattr(self, 'DEPENDENCIES', []) }2. 插件管理器import importlibimport inspectimport osfrom pathlib import Pathfrom typing import Dict, List, Typeclass PluginManager: def __init__(self, server): self.server = server self.plugins: Dict[str, MCPPlugin] = {} self.plugin_directories: List[str] = [] def add_plugin_directory(self, directory: str): """添加插件目录""" if directory not in self.plugin_directories: self.plugin_directories.append(directory) async def discover_plugins(self): """发现插件""" discovered_plugins = [] for directory in self.plugin_directories: plugin_path = Path(directory) if not plugin_path.exists(): continue # 遍历插件目录 for item in plugin_path.iterdir(): if item.is_file() and item.suffix == '.py': # 单文件插件 discovered_plugins.append(str(item)) elif item.is_dir() and (item / '__init__.py').exists(): # 包插件 discovered_plugins.append(str(item)) return discovered_plugins async def load_plugin(self, plugin_path: str) -> bool: """加载插件""" try: # 动态导入插件模块 spec = importlib.util.spec_from_file_location( "plugin_module", plugin_path ) if spec is None or spec.loader is None: return False module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # 查找插件类 plugin_class = None for name, obj in inspect.getmembers(module): if (inspect.isclass(obj) and issubclass(obj, MCPPlugin) and obj is not MCPPlugin): plugin_class = obj break if plugin_class is None: return False # 创建插件实例 plugin = plugin_class() # 初始化插件 await plugin.initialize(self.server) # 注册插件 self.plugins[plugin.name] = plugin # 注册插件提供的工具、资源、提示词 self._register_plugin_tools(plugin) self._register_plugin_resources(plugin) self._register_plugin_prompts(plugin) return True except Exception as e: print(f"加载插件失败 {plugin_path}: {e}") return False def _register_plugin_tools(self, plugin: MCPPlugin): """注册插件工具""" tools = plugin.get_tools() for tool_info in tools: @self.server.tool( name=tool_info["name"], description=tool_info["description"] ) async def tool_wrapper(**kwargs): return await tool_info["function"](**kwargs) def _register_plugin_resources(self, plugin: MCPPlugin): """注册插件资源""" resources = plugin.get_resources() for resource_info in resources: @self.server.resource( uri=resource_info["uri"], name=resource_info["name"], description=resource_info["description"] ) async def resource_wrapper(): return await resource_info["function"]() def _register_plugin_prompts(self, plugin: MCPPlugin): """注册插件提示词""" prompts = plugin.get_prompts() for prompt_info in prompts: @self.server.prompt( name=prompt_info["name"], description=prompt_info["description"] ) async def prompt_wrapper(**kwargs): return await prompt_info["function"](**kwargs) async def unload_plugin(self, plugin_name: str) -> bool: """卸载插件""" if plugin_name not in self.plugins: return False plugin = self.plugins[plugin_name] try: # 关闭插件 await plugin.shutdown() # 从插件列表中移除 del self.plugins[plugin_name] return True except Exception as e: print(f"卸载插件失败 {plugin_name}: {e}") return False async def reload_plugin(self, plugin_name: str) -> bool: """重新加载插件""" if plugin_name not in self.plugins: return False # 先卸载 await self.unload_plugin(plugin_name) # 重新加载(需要记录插件路径) # 这里简化处理,实际需要保存插件路径 return True def get_plugin_info(self, plugin_name: str) -> Dict[str, Any]: """获取插件信息""" if plugin_name not in self.plugins: return {} plugin = self.plugins[plugin_name] return plugin.get_metadata() def list_plugins(self) -> List[Dict[str, Any]]: """列出所有插件""" return [ plugin.get_metadata() for plugin in self.plugins.values() ]3. 示例插件实现# plugins/database_plugin.pyclass DatabasePlugin(MCPPlugin): """数据库插件""" VERSION = "1.0.0" DESCRIPTION = "提供数据库查询和管理功能" AUTHOR = "Your Name" DEPENDENCIES = ["sqlalchemy"] def __init__(self, config: Dict[str, Any] = None): super().__init__(config) self.db_connection = None async def initialize(self, server): """初始化数据库连接""" from sqlalchemy import create_engine db_url = self.config.get("database_url", "sqlite:///mcp.db") self.db_connection = create_engine(db_url) print(f"数据库插件 {self.name} 已初始化") async def shutdown(self): """关闭数据库连接""" if self.db_connection: self.db_connection.dispose() print(f"数据库插件 {self.name} 已关闭") def get_tools(self) -> List[Dict[str, Any]]: """获取数据库工具""" return [ { "name": "query_database", "description": "执行数据库查询", "function": self._query_database }, { "name": "execute_sql", "description": "执行 SQL 语句", "function": self._execute_sql } ] def get_resources(self) -> List[Dict[str, Any]]: """获取数据库资源""" return [ { "uri": "db://schema", "name": "数据库模式", "description": "数据库表结构", "function": self._get_schema } ] def get_prompts(self) -> List[Dict[str, Any]]: """获取数据库提示词""" return [ { "name": "generate_query", "description": "生成 SQL 查询提示词", "function": self._generate_query_prompt } ] async def _query_database(self, query: str) -> str: """执行数据库查询""" from sqlalchemy import text with self.db_connection.connect() as conn: result = conn.execute(text(query)) rows = result.fetchall() return f"查询结果: {len(rows)} 行" async def _execute_sql(self, sql: str) -> str: """执行 SQL 语句""" from sqlalchemy import text with self.db_connection.connect() as conn: conn.execute(text(sql)) conn.commit() return "SQL 执行成功" async def _get_schema(self) -> str: """获取数据库模式""" from sqlalchemy import inspect inspector = inspect(self.db_connection) tables = inspector.get_table_names() return f"数据库表: {', '.join(tables)}" async def _generate_query_prompt(self, table_name: str) -> str: """生成查询提示词""" return f""" 请为表 {table_name} 生成 SQL 查询语句。 要求: 1. 只使用 SELECT 查询 2. 包含适当的 WHERE 条件 3. 添加 LIMIT 限制结果数量 """4. 插件配置import jsonfrom pathlib import Pathclass PluginConfig: def __init__(self, config_dir: str = "config/plugins"): self.config_dir = Path(config_dir) self.config_dir.mkdir(parents=True, exist_ok=True) def load_config(self, plugin_name: str) -> Dict[str, Any]: """加载插件配置""" config_file = self.config_dir / f"{plugin_name}.json" if not config_file.exists(): return {} with open(config_file, 'r') as f: return json.load(f) def save_config( self, plugin_name: str, config: Dict[str, Any] ): """保存插件配置""" config_file = self.config_dir / f"{plugin_name}.json" with open(config_file, 'w') as f: json.dump(config, f, indent=2) def get_all_configs(self) -> Dict[str, Dict[str, Any]]: """获取所有插件配置""" configs = {} for config_file in self.config_dir.glob("*.json"): plugin_name = config_file.stem with open(config_file, 'r') as f: configs[plugin_name] = json.load(f) return configs5. 插件依赖管理from typing import Dict, List, Setclass PluginDependencyManager: def __init__(self): self.dependencies: Dict[str, Set[str]] = {} self.dependents: Dict[str, Set[str]] = {} def add_dependency(self, plugin_name: str, dependency: str): """添加依赖关系""" if plugin_name not in self.dependencies: self.dependencies[plugin_name] = set() self.dependencies[plugin_name].add(dependency) if dependency not in self.dependents: self.dependents[dependency] = set() self.dependents[dependency].add(plugin_name) def get_load_order(self, plugins: List[str]) -> List[str]: """获取插件加载顺序(拓扑排序)""" visited = set() result = [] def visit(plugin_name: str): if plugin_name in visited: return visited.add(plugin_name) # 先加载依赖 for dep in self.dependencies.get(plugin_name, []): visit(dep) result.append(plugin_name) for plugin_name in plugins: visit(plugin_name) return result def check_circular_dependency(self) -> bool: """检查循环依赖""" visited = set() recursion_stack = set() def has_cycle(plugin_name: str) -> bool: visited.add(plugin_name) recursion_stack.add(plugin_name) for dep in self.dependencies.get(plugin_name, []): if dep not in visited: if has_cycle(dep): return True elif dep in recursion_stack: return True recursion_stack.remove(plugin_name) return False for plugin_name in self.dependencies: if plugin_name not in visited: if has_cycle(plugin_name): return True return False6. 插件沙箱import sysfrom typing import Anyclass PluginSandbox: def __init__(self): self.restricted_modules = { 'os', 'sys', 'subprocess', 'socket', 'pickle', 'shelve', 'marshal' } def create_sandbox(self, plugin_name: str) -> dict: """创建插件沙箱环境""" safe_globals = { '__builtins__': self._create_safe_builtins(), '__name__': f'plugin_{plugin_name}', } return safe_globals def _create_safe_builtins(self) -> dict: """创建安全的内置函数""" safe_builtins = {} # 允许的安全内置函数 allowed_builtins = [ 'abs', 'all', 'any', 'bool', 'dict', 'enumerate', 'filter', 'float', 'int', 'len', 'list', 'map', 'max', 'min', 'range', 'set', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip' ] for name in allowed_builtins: if hasattr(__builtins__, name): safe_builtins[name] = getattr(__builtins__, name) return safe_builtins def execute_in_sandbox( self, code: str, sandbox_env: dict ) -> Any: """在沙箱中执行代码""" try: exec(code, sandbox_env) return True except Exception as e: print(f"沙箱执行失败: {e}") return False最佳实践:插件隔离:确保插件之间相互隔离,避免冲突依赖管理:正确处理插件依赖关系错误处理:插件错误不应影响核心系统版本兼容:支持插件版本管理和兼容性检查安全限制:限制插件的权限和资源访问文档完善:为每个插件提供清晰的文档通过完善的插件系统,可以灵活扩展 MCP 服务器的功能,满足各种定制化需求。
阅读 0·2月19日 21:39

如何在 MCP 中管理和使用提示词(Prompts)?

MCP 的提示词(Prompts)管理功能允许预定义和管理可重复使用的提示词模板,提高开发效率和一致性。以下是详细的实现方法:提示词定义MCP 提示词包含名称、描述和参数化模板:{ "name": "prompt_name", "description": "提示词描述", "arguments": [ { "name": "param1", "description": "参数1的描述", "required": true } ]}1. 提示词注册from mcp.server import Serverserver = Server("my-mcp-server")@server.prompt( name="code_review", description="代码审查提示词模板")async def code_review_prompt( code: str, language: str = "python") -> str: """生成代码审查提示词""" return f""" 请审查以下 {language} 代码,并提供改进建议: 代码: {code} 请关注以下方面: 1. 代码质量和可读性 2. 性能优化建议 3. 安全性问题 4. 最佳实践 5. 潜在的 bug """@server.prompt( name="data_analysis", description="数据分析提示词模板")async def data_analysis_prompt( data: str, analysis_type: str = "summary") -> str: """生成数据分析提示词""" return f""" 请对以下数据进行 {analysis_type} 分析: 数据: {data} 请提供: 1. 数据摘要 2. 关键洞察 3. 趋势分析 4. 可视化建议 """2. 提示词模板引擎from string import Templateimport jsonfrom typing import Dict, Anyclass PromptTemplateEngine: def __init__(self): self.templates = {} def register_template(self, name: str, template: str, parameters: list): """注册提示词模板""" self.templates[name] = { "template": template, "parameters": parameters } def render(self, name: str, **kwargs) -> str: """渲染提示词模板""" if name not in self.templates: raise ValueError(f"模板 '{name}' 不存在") template_info = self.templates[name] template = template_info["template"] # 验证必需参数 required_params = [ p["name"] for p in template_info["parameters"] if p.get("required", False) ] missing_params = [ param for param in required_params if param not in kwargs ] if missing_params: raise ValueError( f"缺少必需参数: {', '.join(missing_params)}" ) # 使用 Template 渲染 t = Template(template) return t.substitute(**kwargs) def list_templates(self) -> list: """列出所有模板""" return [ { "name": name, "description": info.get("description", ""), "parameters": info["parameters"] } for name, info in self.templates.items() ]3. 提示词版本管理from datetime import datetimefrom typing import Optionalclass PromptVersionManager: def __init__(self): self.versions = {} self.current_versions = {} def save_version( self, prompt_name: str, version: str, template: str, metadata: dict = None ): """保存提示词版本""" if prompt_name not in self.versions: self.versions[prompt_name] = {} self.versions[prompt_name][version] = { "template": template, "metadata": metadata or {}, "created_at": datetime.now().isoformat() } def set_current_version(self, prompt_name: str, version: str): """设置当前版本""" if prompt_name not in self.versions or \ version not in self.versions[prompt_name]: raise ValueError(f"版本 '{version}' 不存在") self.current_versions[prompt_name] = version def get_current_version(self, prompt_name: str) -> Optional[str]: """获取当前版本""" return self.current_versions.get(prompt_name) def get_version( self, prompt_name: str, version: str ) -> Optional[dict]: """获取指定版本""" if prompt_name not in self.versions: return None return self.versions[prompt_name].get(version) def list_versions(self, prompt_name: str) -> list: """列出所有版本""" if prompt_name not in self.versions: return [] return list(self.versions[prompt_name].keys())4. 提示词缓存from functools import lru_cacheimport hashlibclass PromptCache: def __init__(self, max_size: int = 1000): self.cache = {} self.max_size = max_size def _generate_key( self, prompt_name: str, params: dict ) -> str: """生成缓存键""" param_str = json.dumps(params, sort_keys=True) combined = f"{prompt_name}:{param_str}" return hashlib.md5(combined.encode()).hexdigest() def get(self, prompt_name: str, params: dict) -> Optional[str]: """获取缓存的提示词""" key = self._generate_key(prompt_name, params) return self.cache.get(key) def set(self, prompt_name: str, params: dict, prompt: str): """设置缓存""" # 如果缓存已满,删除最旧的条目 if len(self.cache) >= self.max_size: oldest_key = next(iter(self.cache)) del self.cache[oldest_key] key = self._generate_key(prompt_name, params) self.cache[key] = prompt def clear(self): """清空缓存""" self.cache.clear() def invalidate(self, prompt_name: str): """使指定提示词的缓存失效""" keys_to_remove = [ key for key in self.cache if key.startswith(hashlib.md5(prompt_name.encode()).hexdigest()[:16]) ] for key in keys_to_remove: del self.cache[key]5. 提示词测试和验证class PromptTester: def __init__(self, template_engine: PromptTemplateEngine): self.engine = template_engine def test_template( self, template_name: str, test_cases: list ) -> dict: """测试提示词模板""" results = { "template": template_name, "total_tests": len(test_cases), "passed": 0, "failed": 0, "errors": [] } for i, test_case in enumerate(test_cases): try: # 渲染提示词 prompt = self.engine.render( template_name, **test_case["params"] ) # 验证输出 if "expected_contains" in test_case: for expected in test_case["expected_contains"]: if expected not in prompt: results["errors"].append({ "test": i, "error": f"期望包含 '{expected}' 但未找到" }) results["failed"] += 1 break else: results["passed"] += 1 else: results["passed"] += 1 except Exception as e: results["errors"].append({ "test": i, "error": str(e) }) results["failed"] += 1 return results def validate_parameters( self, template_name: str, params: dict ) -> tuple[bool, str]: """验证参数""" try: # 尝试渲染提示词 self.engine.render(template_name, **params) return True, "" except Exception as e: return False, str(e)6. 提示词分析和优化import refrom collections import Counterclass PromptAnalyzer: def analyze(self, prompt: str) -> dict: """分析提示词""" return { "length": len(prompt), "word_count": len(prompt.split()), "sentence_count": len(re.split(r'[.!?]+', prompt)), "token_estimate": self._estimate_tokens(prompt), "complexity": self._calculate_complexity(prompt), "variables": self._extract_variables(prompt) } def _estimate_tokens(self, text: str) -> int: """估算 token 数量""" # 粗略估算:英文约 4 字符/token,中文约 1.5 字符/token chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) english_chars = len(re.findall(r'[a-zA-Z]', text)) return int(chinese_chars / 1.5 + english_chars / 4) def _calculate_complexity(self, prompt: str) -> float: """计算复杂度""" words = prompt.split() unique_words = set(words) # 词汇多样性 diversity = len(unique_words) / len(words) if words else 0 # 平均句子长度 sentences = re.split(r'[.!?]+', prompt) avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0 return (diversity + min(avg_sentence_length / 20, 1)) / 2 def _extract_variables(self, prompt: str) -> list: """提取变量""" return re.findall(r'\{(\w+)\}', prompt)最佳实践:参数化设计:使用参数化模板提高复用性版本控制:对重要提示词实施版本管理缓存策略:缓存渲染结果提高性能测试覆盖:编写测试用例确保提示词质量性能监控:监控提示词的 token 使用和性能文档完善:为每个提示词提供清晰的文档通过完善的提示词管理机制,可以提高 MCP 系统的开发效率和一致性。
阅读 0·2月19日 21:38

MCP 如何支持多租户架构?

MCP 的多租户支持对于企业级应用至关重要,它允许在单一 MCP 服务器实例中为多个客户或组织提供隔离的服务。以下是详细的实现方法:多租户架构设计MCP 多租户应考虑以下方面:数据隔离:确保不同租户的数据完全隔离资源隔离:隔离计算资源和配额安全隔离:实现租户级别的认证和授权性能隔离:防止单个租户影响其他租户1. 租户识别和上下文from typing import Optionalfrom dataclasses import dataclass@dataclassclass TenantContext: """租户上下文""" tenant_id: str tenant_name: str user_id: str permissions: list quotas: dictclass TenantContextManager: def __init__(self): self.contexts = {} def create_context( self, tenant_id: str, tenant_name: str, user_id: str, permissions: list, quotas: dict = None ) -> TenantContext: """创建租户上下文""" context = TenantContext( tenant_id=tenant_id, tenant_name=tenant_name, user_id=user_id, permissions=permissions, quotas=quotas or self._get_default_quotas(tenant_id) ) self.contexts[tenant_id] = context return context def get_context(self, tenant_id: str) -> Optional[TenantContext]: """获取租户上下文""" return self.contexts.get(tenant_id) def set_current_context(self, tenant_id: str): """设置当前租户上下文""" context = self.get_context(tenant_id) if not context: raise ValueError(f"租户 {tenant_id} 不存在") # 使用线程本地存储或异步上下文变量 import contextvars current_tenant.set(context) def _get_default_quotas(self, tenant_id: str) -> dict: """获取默认配额""" return { "max_tools": 100, "max_resources": 1000, "max_requests_per_minute": 1000, "max_storage_mb": 1024 }# 当前租户上下文变量current_tenant = contextvars.ContextVar('current_tenant', default=None)2. 数据隔离from sqlalchemy import create_engine, Column, String, Integer, Textfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, scoped_sessionBase = declarative_base()class TenantData(Base): """租户数据表""" __tablename__ = 'tenant_data' id = Column(Integer, primary_key=True) tenant_id = Column(String(50), nullable=False, index=True) data_key = Column(String(100), nullable=False) data_value = Column(Text) __table_args__ = ( # 确保租户隔离 Index('idx_tenant_key', 'tenant_id', 'data_key', unique=True), )class MultiTenantDatabase: def __init__(self, database_url: str): self.engine = create_engine(database_url) Base.metadata.create_all(self.engine) self.SessionLocal = scoped_session( sessionmaker(autocommit=False, autoflush=False, bind=self.engine) ) def get_session(self, tenant_id: str): """获取租户专属的数据库会话""" session = self.SessionLocal() # 添加租户过滤器 from sqlalchemy import event @event.listens_for(session, 'before_flush') def add_tenant_filter(session, context, instances): for instance in session.new: if hasattr(instance, 'tenant_id'): instance.tenant_id = tenant_id return session def query_tenant_data( self, tenant_id: str, data_key: str ) -> Optional[str]: """查询租户数据""" session = self.get_session(tenant_id) try: result = session.query(TenantData).filter( TenantData.tenant_id == tenant_id, TenantData.data_key == data_key ).first() return result.data_value if result else None finally: session.close() def save_tenant_data( self, tenant_id: str, data_key: str, data_value: str ): """保存租户数据""" session = self.get_session(tenant_id) try: existing = session.query(TenantData).filter( TenantData.tenant_id == tenant_id, TenantData.data_key == data_key ).first() if existing: existing.data_value = data_value else: new_data = TenantData( tenant_id=tenant_id, data_key=data_key, data_value=data_value ) session.add(new_data) session.commit() except Exception as e: session.rollback() raise e finally: session.close()3. 资源配额管理from collections import defaultdictimport timeclass QuotaManager: def __init__(self): self.quotas = {} self.usage = defaultdict(lambda: defaultdict(int)) self.rate_limits = {} def set_quota( self, tenant_id: str, quota_type: str, limit: int ): """设置租户配额""" if tenant_id not in self.quotas: self.quotas[tenant_id] = {} self.quotas[tenant_id][quota_type] = limit def check_quota( self, tenant_id: str, quota_type: str, amount: int = 1 ) -> bool: """检查配额是否足够""" if tenant_id not in self.quotas: return True limit = self.quotas[tenant_id].get(quota_type) if limit is None: return True current_usage = self.usage[tenant_id][quota_type] return current_usage + amount <= limit def consume_quota( self, tenant_id: str, quota_type: str, amount: int = 1 ) -> bool: """消耗配额""" if not self.check_quota(tenant_id, quota_type, amount): return False self.usage[tenant_id][quota_type] += amount return True def get_usage( self, tenant_id: str, quota_type: str ) -> int: """获取使用量""" return self.usage[tenant_id][quota_type] def reset_usage(self, tenant_id: str): """重置使用量""" if tenant_id in self.usage: self.usage[tenant_id].clear() def check_rate_limit( self, tenant_id: str, window: int = 60, max_requests: int = 100 ) -> bool: """检查速率限制""" now = time.time() if tenant_id not in self.rate_limits: self.rate_limits[tenant_id] = [] # 清理过期的请求记录 self.rate_limits[tenant_id] = [ timestamp for timestamp in self.rate_limits[tenant_id] if now - timestamp < window ] # 检查是否超过限制 if len(self.rate_limits[tenant_id]) >= max_requests: return False # 记录新请求 self.rate_limits[tenant_id].append(now) return True4. 租户级别的工具和资源from mcp.server import Serverfrom functools import wrapsclass MultiTenantServer(Server): def __init__(self, name: str, tenant_manager: TenantContextManager): super().__init__(name) self.tenant_manager = tenant_manager self.tenant_tools = defaultdict(dict) self.tenant_resources = defaultdict(dict) def tenant_tool( self, name: str, description: str, tenant_id: str = None ): """租户专属工具装饰器""" def decorator(func): # 注册工具 self.tenant_tools[tenant_id or "default"][name] = { "function": func, "description": description } @wraps(func) async def wrapper(*args, **kwargs): # 获取当前租户 context = current_tenant.get() if not context: raise PermissionError("未找到租户上下文") # 检查租户权限 if tenant_id and context.tenant_id != tenant_id: raise PermissionError("无权访问此工具") # 执行工具 return await func(*args, **kwargs) return wrapper return decorator def tenant_resource( self, uri: str, name: str, description: str, tenant_id: str = None ): """租户专属资源装饰器""" def decorator(func): # 注册资源 self.tenant_resources[tenant_id or "default"][uri] = { "function": func, "name": name, "description": description } @wraps(func) async def wrapper(*args, **kwargs): # 获取当前租户 context = current_tenant.get() if not context: raise PermissionError("未找到租户上下文") # 检查租户权限 if tenant_id and context.tenant_id != tenant_id: raise PermissionError("无权访问此资源") # 执行资源 return await func(*args, **kwargs) return wrapper return decorator async def list_tools(self, tenant_id: str = None) -> list: """列出可用工具""" context = current_tenant.get() if not context: return [] # 获取默认工具和租户专属工具 tools = [] # 添加默认工具 for name, tool_info in self.tenant_tools["default"].items(): tools.append({ "name": name, "description": tool_info["description"] }) # 添加租户专属工具 if context.tenant_id in self.tenant_tools: for name, tool_info in self.tenant_tools[context.tenant_id].items(): tools.append({ "name": name, "description": tool_info["description"] }) return tools5. 租户认证和授权import jwtfrom datetime import datetime, timedeltafrom typing import Dict, Anyclass TenantAuthenticator: def __init__(self, secret_key: str): self.secret_key = secret_key def generate_token( self, tenant_id: str, user_id: str, permissions: list, expires_in: int = 3600 ) -> str: """生成租户令牌""" payload = { "tenant_id": tenant_id, "user_id": user_id, "permissions": permissions, "exp": datetime.utcnow() + timedelta(seconds=expires_in), "iat": datetime.utcnow() } token = jwt.encode(payload, self.secret_key, algorithm="HS256") return token def verify_token(self, token: str) -> Dict[str, Any]: """验证租户令牌""" try: payload = jwt.decode(token, self.secret_key, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise ValueError("令牌已过期") except jwt.InvalidTokenError: raise ValueError("无效的令牌") def check_permission( self, token: str, required_permission: str ) -> bool: """检查权限""" payload = self.verify_token(token) permissions = payload.get("permissions", []) return required_permission in permissions or "admin" in permissions6. 租户监控和报告from collections import defaultdictfrom datetime import datetime, timedeltaclass TenantMonitor: def __init__(self): self.metrics = defaultdict(lambda: defaultdict(list)) def record_metric( self, tenant_id: str, metric_name: str, value: float ): """记录指标""" timestamp = datetime.now() self.metrics[tenant_id][metric_name].append({ "value": value, "timestamp": timestamp }) # 限制历史记录大小 if len(self.metrics[tenant_id][metric_name]) > 1000: self.metrics[tenant_id][metric_name] = \ self.metrics[tenant_id][metric_name][-1000:] def get_metrics( self, tenant_id: str, metric_name: str, since: datetime = None ) -> list: """获取指标""" if tenant_id not in self.metrics: return [] if metric_name not in self.metrics[tenant_id]: return [] records = self.metrics[tenant_id][metric_name] if since: records = [ record for record in records if record["timestamp"] >= since ] return records def get_aggregated_metrics( self, tenant_id: str, metric_name: str, since: datetime = None ) -> dict: """获取聚合指标""" records = self.get_metrics(tenant_id, metric_name, since) if not records: return {} values = [record["value"] for record in records] return { "count": len(values), "sum": sum(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values) } def generate_tenant_report( self, tenant_id: str, since: datetime = None ) -> dict: """生成租户报告""" if not since: since = datetime.now() - timedelta(days=7) report = { "tenant_id": tenant_id, "period": { "start": since, "end": datetime.now() }, "metrics": {} } if tenant_id in self.metrics: for metric_name in self.metrics[tenant_id]: report["metrics"][metric_name] = \ self.get_aggregated_metrics(tenant_id, metric_name, since) return report最佳实践:数据隔离:使用租户 ID 作为所有数据表的主键或索引配额管理:为每个租户设置合理的资源配额权限控制:实施细粒度的租户级权限控制性能监控:监控每个租户的资源使用情况安全审计:记录所有租户操作用于审计弹性扩展:根据租户需求动态扩展资源通过完善的多租户支持,可以在单一 MCP 服务器实例中为多个客户或组织提供隔离、安全、高效的服务。
阅读 0·2月19日 21:38

MCP 的消息格式是怎样的?有哪些常用的消息类型?

MCP 的消息格式基于 JSON-RPC 2.0 协议,并进行了扩展以支持 AI 模型与外部系统的交互。以下是详细的消息格式说明:基础消息结构所有 MCP 消息都遵循 JSON-RPC 2.0 的基本格式:{ "jsonrpc": "2.0", "id": "unique-request-id", "method": "method-name", "params": { ... }}1. 请求消息(Request)用于客户端向服务器发送工具调用请求:{ "jsonrpc": "2.0", "id": "req-123", "method": "tools/call", "params": { "name": "calculate", "arguments": { "expression": "2 + 2" } }}2. 响应消息(Response)服务器返回执行结果:{ "jsonrpc": "2.0", "id": "req-123", "result": { "content": [ { "type": "text", "text": "结果: 4" } ] }}3. 错误响应(Error Response)当请求失败时返回:{ "jsonrpc": "2.0", "id": "req-123", "error": { "code": -32602, "message": "Invalid params", "data": { "details": "参数 'expression' 不能为空" } }}4. 通知消息(Notification)服务器主动推送的消息(无需响应):{ "jsonrpc": "2.0", "method": "notifications/progress", "params": { "progress": 0.5, "message": "处理中..." }}常用方法类型tools/list - 获取可用工具列表{ "jsonrpc": "2.0", "id": "req-001", "method": "tools/list"}resources/list - 获取可用资源列表{ "jsonrpc": "2.0", "id": "req-002", "method": "resources/list"}resources/read - 读取资源内容{ "jsonrpc": "2.0", "id": "req-003", "method": "resources/read", "params": { "uri": "file:///data/config.json" }}prompts/list - 获取提示词列表{ "jsonrpc": "2.0", "id": "req-004", "method": "prompts/list"}错误代码MCP 定义了标准的错误代码:| 代码 | 名称 | 描述 ||------|------|------|| -32700 | Parse error | JSON 解析错误 || -32600 | Invalid Request | 无效的请求 || -32601 | Method not found | 方法不存在 || -32602 | Invalid params | 无效的参数 || -32603 | Internal error | 内部错误 || -32000 | Server error | 服务器错误 |内容类型(Content Types)MCP 支持多种内容类型:{ "type": "text", "text": "纯文本内容"}{ "type": "image", "data": "base64-encoded-image-data", "mimeType": "image/png"}{ "type": "resource", "uri": "file:///data/report.pdf", "mimeType": "application/pdf"}消息流(Message Streaming)对于长时间运行的操作,支持流式响应:{ "jsonrpc": "2.0", "id": "req-005", "method": "tools/call", "params": { "name": "generate_report", "arguments": { "stream": true } }}最佳实践:唯一 ID:每个请求必须有唯一的 ID类型验证:严格验证参数类型和格式错误处理:提供详细的错误信息和数据超时处理:实现请求超时机制日志记录:记录所有消息用于调试和审计理解 MCP 的消息格式对于实现兼容的服务器和客户端至关重要。
阅读 0·2月19日 21:35