MCP 的错误处理和重试机制如何实现?
MCP 的错误处理和重试机制对于确保系统稳定性和可靠性至关重要。以下是详细的错误处理策略和重试机制实现:错误处理架构MCP 错误处理应考虑以下方面:错误分类:区分不同类型的错误错误传播:正确传播错误信息错误恢复:实现错误恢复机制重试策略:智能的重试策略熔断机制:防止级联故障降级策略:在故障时提供降级服务1. 错误分类和定义from enum import Enumfrom typing import Optional, Dict, Anyfrom dataclasses import dataclassclass ErrorType(Enum): """错误类型""" VALIDATION_ERROR = "validation_error" AUTHENTICATION_ERROR = "authentication_error" AUTHORIZATION_ERROR = "authorization_error" NOT_FOUND_ERROR = "not_found_error" CONFLICT_ERROR = "conflict_error" RATE_LIMIT_ERROR = "rate_limit_error" INTERNAL_ERROR = "internal_error" EXTERNAL_SERVICE_ERROR = "external_service_error" TIMEOUT_ERROR = "timeout_error" NETWORK_ERROR = "network_error"class ErrorSeverity(Enum): """错误严重程度""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical"@dataclassclass MCPError(Exception): """MCP 错误基类""" error_type: ErrorType message: str code: int details: Dict[str, Any] = None severity: ErrorSeverity = ErrorSeverity.MEDIUM retryable: bool = False def __post_init__(self): if self.details is None: self.details = {} super().__init__(self.message) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "error_type": self.error_type.value, "message": self.message, "code": self.code, "details": self.details, "severity": self.severity.value, "retryable": self.retryable }class ValidationError(MCPError): """验证错误""" def __init__(self, message: str, details: Dict[str, Any] = None): super().__init__( error_type=ErrorType.VALIDATION_ERROR, message=message, code=400, details=details, severity=ErrorSeverity.LOW, retryable=False )class AuthenticationError(MCPError): """认证错误""" def __init__(self, message: str = "Authentication failed"): super().__init__( error_type=ErrorType.AUTHENTICATION_ERROR, message=message, code=401, severity=ErrorSeverity.HIGH, retryable=False )class AuthorizationError(MCPError): """授权错误""" def __init__(self, message: str = "Access denied"): super().__init__( error_type=ErrorType.AUTHORIZATION_ERROR, message=message, code=403, severity=ErrorSeverity.HIGH, retryable=False )class NotFoundError(MCPError): """未找到错误""" def __init__(self, resource: str, identifier: str): super().__init__( error_type=ErrorType.NOT_FOUND_ERROR, message=f"{resource} not found: {identifier}", code=404, details={"resource": resource, "identifier": identifier}, severity=ErrorSeverity.LOW, retryable=False )class RateLimitError(MCPError): """速率限制错误""" def __init__(self, message: str = "Rate limit exceeded", retry_after: int = 60): super().__init__( error_type=ErrorType.RATE_LIMIT_ERROR, message=message, code=429, details={"retry_after": retry_after}, severity=ErrorSeverity.MEDIUM, retryable=True )class InternalError(MCPError): """内部错误""" def __init__(self, message: str = "Internal server error"): super().__init__( error_type=ErrorType.INTERNAL_ERROR, message=message, code=500, severity=ErrorSeverity.CRITICAL, retryable=True )class ExternalServiceError(MCPError): """外部服务错误""" def __init__(self, service: str, message: str): super().__init__( error_type=ErrorType.EXTERNAL_SERVICE_ERROR, message=f"{service} error: {message}", code=502, details={"service": service}, severity=ErrorSeverity.HIGH, retryable=True )class TimeoutError(MCPError): """超时错误""" def __init__(self, operation: str, timeout: float): super().__init__( error_type=ErrorType.TIMEOUT_ERROR, message=f"{operation} timed out after {timeout}s", code=504, details={"operation": operation, "timeout": timeout}, severity=ErrorSeverity.HIGH, retryable=True )2. 错误处理器from typing import Callable, Optional, Dict, Anyimport loggingimport tracebackclass ErrorHandler: """错误处理器""" def __init__(self, logger: logging.Logger = None): self.logger = logger or logging.getLogger(__name__) self.error_handlers: Dict[ErrorType, Callable] = {} self.error_reporters: List[Callable] = [] def register_handler( self, error_type: ErrorType, handler: Callable ): """注册错误处理器""" self.error_handlers[error_type] = handler def register_reporter(self, reporter: Callable): """注册错误报告器""" self.error_reporters.append(reporter) async def handle_error( self, error: Exception, context: Dict[str, Any] = None ) -> Dict[str, Any]: """处理错误""" # 记录错误 await self._log_error(error, context) # 报告错误 await self._report_error(error, context) # 转换为 MCP 错误 mcp_error = self._convert_to_mcp_error(error) # 调用特定错误处理器 if mcp_error.error_type in self.error_handlers: try: result = await self.error_handlers[mcp_error.error_type]( mcp_error, context ) return result except Exception as e: self.logger.error(f"错误处理器失败: {e}") # 返回默认错误响应 return mcp_error.to_dict() async def _log_error( self, error: Exception, context: Dict[str, Any] = None ): """记录错误""" if isinstance(error, MCPError): self.logger.error( f"MCP Error: {error.error_type.value} - {error.message}", extra={ "error_code": error.code, "error_details": error.details, "context": context } ) else: self.logger.error( f"Unexpected error: {str(error)}", exc_info=True, extra={"context": context} ) async def _report_error( self, error: Exception, context: Dict[str, Any] = None ): """报告错误""" for reporter in self.error_reporters: try: await reporter(error, context) except Exception as e: self.logger.error(f"错误报告器失败: {e}") def _convert_to_mcp_error(self, error: Exception) -> MCPError: """转换为 MCP 错误""" if isinstance(error, MCPError): return error # 根据异常类型转换 if isinstance(error, ValueError): return ValidationError(str(error)) elif isinstance(error, PermissionError): return AuthorizationError(str(error)) elif isinstance(error, TimeoutError): return TimeoutError("operation", 0) else: return InternalError(str(error))# 错误报告器示例class ErrorReporter: """错误报告器""" def __init__(self, error_service_url: str): self.error_service_url = error_service_url async def report_error( self, error: Exception, context: Dict[str, Any] = None ): """报告错误到错误服务""" import aiohttp error_data = { "error": str(error), "error_type": type(error).__name__, "context": context or {}, "timestamp": datetime.now().isoformat() } try: async with aiohttp.ClientSession() as session: async with session.post( self.error_service_url, json=error_data ) as response: if response.status != 200: self.logger.error( f"报告错误失败: {response.status}" ) except Exception as e: self.logger.error(f"报告错误失败: {e}")3. 重试机制import asynciofrom typing import Callable, Optional, Typeimport timeclass RetryStrategy: """重试策略基类""" async def should_retry( self, attempt: int, error: Exception ) -> bool: """判断是否应该重试""" raise NotImplementedError async def get_delay(self, attempt: int) -> float: """获取重试延迟""" raise NotImplementedErrorclass FixedDelayRetry(RetryStrategy): """固定延迟重试""" def __init__(self, max_attempts: int = 3, delay: float = 1.0): self.max_attempts = max_attempts self.delay = delay async def should_retry( self, attempt: int, error: Exception ) -> bool: """判断是否应该重试""" if attempt >= self.max_attempts: return False if isinstance(error, MCPError): return error.retryable return True async def get_delay(self, attempt: int) -> float: """获取重试延迟""" return self.delayclass ExponentialBackoffRetry(RetryStrategy): """指数退避重试""" def __init__( self, max_attempts: int = 5, initial_delay: float = 1.0, max_delay: float = 60.0, backoff_factor: float = 2.0 ): self.max_attempts = max_attempts self.initial_delay = initial_delay self.max_delay = max_delay self.backoff_factor = backoff_factor async def should_retry( self, attempt: int, error: Exception ) -> bool: """判断是否应该重试""" if attempt >= self.max_attempts: return False if isinstance(error, MCPError): return error.retryable return True async def get_delay(self, attempt: int) -> float: """获取重试延迟""" delay = self.initial_delay * (self.backoff_factor ** attempt) return min(delay, self.max_delay)class RetryManager: """重试管理器""" def __init__(self, retry_strategy: RetryStrategy): self.retry_strategy = retry_strategy async def execute_with_retry( self, func: Callable, *args, **kwargs ) -> Any: """带重试执行函数""" attempt = 0 last_error = None while True: attempt += 1 try: result = await func(*args, **kwargs) return result except Exception as error: last_error = error # 判断是否应该重试 should_retry = await self.retry_strategy.should_retry( attempt, error ) if not should_retry: raise error # 获取重试延迟 delay = await self.retry_strategy.get_delay(attempt) # 等待后重试 await asyncio.sleep(delay) raise last_error# 重试装饰器def retry( max_attempts: int = 3, delay: float = 1.0, backoff_factor: float = 2.0, max_delay: float = 60.0): """重试装饰器""" def decorator(func: Callable): retry_strategy = ExponentialBackoffRetry( max_attempts=max_attempts, initial_delay=delay, max_delay=max_delay, backoff_factor=backoff_factor ) retry_manager = RetryManager(retry_strategy) @wraps(func) async def wrapper(*args, **kwargs): return await retry_manager.execute_with_retry( func, *args, **kwargs ) return wrapper return decorator4. 熔断机制from enum import Enumfrom typing import Callable, Optionalimport asyncioclass CircuitState(Enum): """熔断器状态""" CLOSED = "closed" # 正常状态 OPEN = "open" # 熔断状态 HALF_OPEN = "half_open" # 半开状态class CircuitBreaker: """熔断器""" def __init__( self, failure_threshold: int = 5, success_threshold: int = 2, timeout: float = 60.0 ): self.failure_threshold = failure_threshold self.success_threshold = success_threshold self.timeout = timeout self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time: Optional[float] = None self.lock = asyncio.Lock() async def execute( self, func: Callable, *args, **kwargs ) -> Any: """执行函数""" async with self.lock: # 检查熔断器状态 if self.state == CircuitState.OPEN: # 检查是否应该尝试恢复 if time.time() - self.last_failure_time > self.timeout: self.state = CircuitState.HALF_OPEN self.success_count = 0 else: raise MCPError( error_type=ErrorType.INTERNAL_ERROR, message="Circuit breaker is OPEN", code=503, retryable=True ) try: result = await func(*args, **kwargs) # 成功执行 async with self.lock: if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.success_threshold: self.state = CircuitState.CLOSED self.failure_count = 0 elif self.state == CircuitState.CLOSED: self.failure_count = 0 return result except Exception as error: # 执行失败 async with self.lock: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN raise error def get_state(self) -> CircuitState: """获取熔断器状态""" return self.state def reset(self): """重置熔断器""" async with self.lock: self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time = None# 熔断器装饰器def circuit_breaker( failure_threshold: int = 5, success_threshold: int = 2, timeout: float = 60.0): """熔断器装饰器""" def decorator(func: Callable): breaker = CircuitBreaker( failure_threshold=failure_threshold, success_threshold=success_threshold, timeout=timeout ) @wraps(func) async def wrapper(*args, **kwargs): return await breaker.execute(func, *args, **kwargs) return wrapper return decorator5. 降级策略from typing import Callable, Optional, Dict, Anyimport asyncioclass FallbackStrategy: """降级策略基类""" async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """执行降级逻辑""" raise NotImplementedErrorclass CacheFallback(FallbackStrategy): """缓存降级""" def __init__(self, cache: Dict[str, Any]): self.cache = cache async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """从缓存获取数据""" cache_key = context.get("cache_key") if context else None if cache_key and cache_key in self.cache: return self.cache[cache_key] raise errorclass DefaultFallback(FallbackStrategy): """默认值降级""" def __init__(self, default_value: Any): self.default_value = default_value async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """返回默认值""" return self.default_valueclass FallbackManager: """降级管理器""" def __init__(self): self.fallback_strategies: Dict[ErrorType, FallbackStrategy] = {} self.default_fallback: Optional[FallbackStrategy] = None def register_fallback( self, error_type: ErrorType, fallback: FallbackStrategy ): """注册降级策略""" self.fallback_strategies[error_type] = fallback def set_default_fallback(self, fallback: FallbackStrategy): """设置默认降级策略""" self.default_fallback = fallback async def execute_with_fallback( self, func: Callable, context: Dict[str, Any] = None, *args, **kwargs ) -> Any: """带降级执行函数""" try: return await func(*args, **kwargs) except Exception as error: # 转换为 MCP 错误 if not isinstance(error, MCPError): error = InternalError(str(error)) # 查找对应的降级策略 fallback = self.fallback_strategies.get( error.error_type, self.default_fallback ) if fallback: try: return await fallback.execute_fallback(error, context) except Exception as fallback_error: raise fallback_error raise error# 降级装饰器def fallback( error_type: ErrorType = None, default_value: Any = None): """降级装饰器""" def decorator(func: Callable): fallback_manager = FallbackManager() if error_type and default_value is not None: fallback_manager.register_fallback( error_type, DefaultFallback(default_value) ) @wraps(func) async def wrapper(*args, **kwargs): return await fallback_manager.execute_with_fallback( func, None, *args, **kwargs ) return wrapper return decorator6. 综合错误处理示例from mcp.server import Serverclass RobustMCPServer(Server): """健壮的 MCP 服务器""" def __init__(self, name: str): super().__init__(name) # 初始化错误处理组件 self.error_handler = ErrorHandler() self.retry_manager = RetryManager(ExponentialBackoffRetry()) self.circuit_breaker = CircuitBreaker() self.fallback_manager = FallbackManager() # 配置错误处理 self._setup_error_handling() def _setup_error_handling(self): """设置错误处理""" # 注册错误处理器 self.error_handler.register_handler( ErrorType.VALIDATION_ERROR, self._handle_validation_error ) self.error_handler.register_handler( ErrorType.RATE_LIMIT_ERROR, self._handle_rate_limit_error ) # 注册降级策略 self.fallback_manager.register_fallback( ErrorType.EXTERNAL_SERVICE_ERROR, CacheFallback({}) ) async def _handle_validation_error( self, error: ValidationError, context: Dict[str, Any] ) -> Dict[str, Any]: """处理验证错误""" return { "error": error.to_dict(), "suggestions": self._get_validation_suggestions(error.details) } async def _handle_rate_limit_error( self, error: RateLimitError, context: Dict[str, Any] ) -> Dict[str, Any]: """处理速率限制错误""" retry_after = error.details.get("retry_after", 60) return { "error": error.to_dict(), "retry_after": retry_after, "message": f"请等待 {retry_after} 秒后重试" } def _get_validation_suggestions( self, details: Dict[str, Any] ) -> List[str]: """获取验证建议""" suggestions = [] # 根据错误详情提供建议 # ... return suggestions @retry(max_attempts=3, delay=1.0) @circuit_breaker(failure_threshold=5, timeout=60.0) @fallback(error_type=ErrorType.EXTERNAL_SERVICE_ERROR, default_value={}) async def call_external_service( self, service_url: str, params: Dict[str, Any] ) -> Dict[str, Any]: """调用外部服务""" try: # 调用外部服务 # ... pass except Exception as error: # 转换为 MCP 错误 raise ExternalServiceError("external", str(error))最佳实践:错误分类:正确分类错误类型,便于针对性处理重试策略:根据错误类型选择合适的重试策略熔断机制:防止级联故障,保护系统稳定性降级策略:在故障时提供降级服务,保证基本功能错误日志:详细记录错误信息,便于问题排查监控告警:监控错误率,及时发现问题通过完善的错误处理和重试机制,可以确保 MCP 系统的稳定性和可靠性。