乐闻世界logo
搜索文章和话题

How does MCP implement error handling and retry mechanisms?

2月21日 15:51

Error handling and retry mechanisms for MCP are crucial for ensuring system stability and reliability. Here are detailed error handling strategies and retry mechanism implementations:

Error Handling Architecture

MCP error handling should consider following aspects:

  1. Error Classification: Distinguish between different types of errors
  2. Error Propagation: Properly propagate error information
  3. Error Recovery: Implement error recovery mechanisms
  4. Retry Strategy: Intelligent retry strategies
  5. Circuit Breaker: Prevent cascading failures
  6. Fallback Strategy: Provide degraded services during failures

1. Error Classification and Definition

python
from enum import Enum from typing import Optional, Dict, Any from dataclasses import dataclass class ErrorType(Enum): """Error type""" VALIDATION_ERROR = "validation_error" AUTHENTICATION_ERROR = "authentication_error" AUTHORIZATION_ERROR = "authorization_error" NOT_FOUND_ERROR = "not_found_error" CONFLICT_ERROR = "conflict_error" RATE_LIMIT_ERROR = "rate_limit_error" INTERNAL_ERROR = "internal_error" EXTERNAL_SERVICE_ERROR = "external_service_error" TIMEOUT_ERROR = "timeout_error" NETWORK_ERROR = "network_error" class ErrorSeverity(Enum): """Error severity""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" @dataclass class MCPError(Exception): """MCP error base class""" error_type: ErrorType message: str code: int details: Dict[str, Any] = None severity: ErrorSeverity = ErrorSeverity.MEDIUM retryable: bool = False def __post_init__(self): if self.details is None: self.details = {} super().__init__(self.message) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { "error_type": self.error_type.value, "message": self.message, "code": self.code, "details": self.details, "severity": self.severity.value, "retryable": self.retryable } class ValidationError(MCPError): """Validation error""" def __init__(self, message: str, details: Dict[str, Any] = None): super().__init__( error_type=ErrorType.VALIDATION_ERROR, message=message, code=400, details=details, severity=ErrorSeverity.LOW, retryable=False ) class AuthenticationError(MCPError): """Authentication error""" def __init__(self, message: str = "Authentication failed"): super().__init__( error_type=ErrorType.AUTHENTICATION_ERROR, message=message, code=401, severity=ErrorSeverity.HIGH, retryable=False ) class AuthorizationError(MCPError): """Authorization error""" def __init__(self, message: str = "Access denied"): super().__init__( error_type=ErrorType.AUTHORIZATION_ERROR, message=message, code=403, severity=ErrorSeverity.HIGH, retryable=False ) class NotFoundError(MCPError): """Not found error""" def __init__(self, resource: str, identifier: str): super().__init__( error_type=ErrorType.NOT_FOUND_ERROR, message=f"{resource} not found: {identifier}", code=404, details={"resource": resource, "identifier": identifier}, severity=ErrorSeverity.LOW, retryable=False ) class RateLimitError(MCPError): """Rate limit error""" def __init__(self, message: str = "Rate limit exceeded", retry_after: int = 60): super().__init__( error_type=ErrorType.RATE_LIMIT_ERROR, message=message, code=429, details={"retry_after": retry_after}, severity=ErrorSeverity.MEDIUM, retryable=True ) class InternalError(MCPError): """Internal error""" def __init__(self, message: str = "Internal server error"): super().__init__( error_type=ErrorType.INTERNAL_ERROR, message=message, code=500, severity=ErrorSeverity.CRITICAL, retryable=True ) class ExternalServiceError(MCPError): """External service error""" def __init__(self, service: str, message: str): super().__init__( error_type=ErrorType.EXTERNAL_SERVICE_ERROR, message=f"{service} error: {message}", code=502, details={"service": service}, severity=ErrorSeverity.HIGH, retryable=True ) class TimeoutError(MCPError): """Timeout error""" def __init__(self, operation: str, timeout: float): super().__init__( error_type=ErrorType.TIMEOUT_ERROR, message=f"{operation} timed out after {timeout}s", code=504, details={"operation": operation, "timeout": timeout}, severity=ErrorSeverity.HIGH, retryable=True )

2. Error Handler

python
from typing import Callable, Optional, Dict, Any import logging import traceback class ErrorHandler: """Error handler""" def __init__(self, logger: logging.Logger = None): self.logger = logger or logging.getLogger(__name__) self.error_handlers: Dict[ErrorType, Callable] = {} self.error_reporters: List[Callable] = [] def register_handler( self, error_type: ErrorType, handler: Callable ): """Register error handler""" self.error_handlers[error_type] = handler def register_reporter(self, reporter: Callable): """Register error reporter""" self.error_reporters.append(reporter) async def handle_error( self, error: Exception, context: Dict[str, Any] = None ) -> Dict[str, Any]: """Handle error""" # Log error await self._log_error(error, context) # Report error await self._report_error(error, context) # Convert to MCP error mcp_error = self._convert_to_mcp_error(error) # Call specific error handler if mcp_error.error_type in self.error_handlers: try: result = await self.error_handlers[mcp_error.error_type]( mcp_error, context ) return result except Exception as e: self.logger.error(f"Error handler failed: {e}") # Return default error response return mcp_error.to_dict() async def _log_error( self, error: Exception, context: Dict[str, Any] = None ): """Log error""" if isinstance(error, MCPError): self.logger.error( f"MCP Error: {error.error_type.value} - {error.message}", extra={ "error_code": error.code, "error_details": error.details, "context": context } ) else: self.logger.error( f"Unexpected error: {str(error)}", exc_info=True, extra={"context": context} ) async def _report_error( self, error: Exception, context: Dict[str, Any] = None ): """Report error""" for reporter in self.error_reporters: try: await reporter(error, context) except Exception as e: self.logger.error(f"Error reporter failed: {e}") def _convert_to_mcp_error(self, error: Exception) -> MCPError: """Convert to MCP error""" if isinstance(error, MCPError): return error # Convert based on exception type if isinstance(error, ValueError): return ValidationError(str(error)) elif isinstance(error, PermissionError): return AuthorizationError(str(error)) elif isinstance(error, TimeoutError): return TimeoutError("operation", 0) else: return InternalError(str(error)) # Error reporter example class ErrorReporter: """Error reporter""" def __init__(self, error_service_url: str): self.error_service_url = error_service_url async def report_error( self, error: Exception, context: Dict[str, Any] = None ): """Report error to error service""" import aiohttp error_data = { "error": str(error), "error_type": type(error).__name__, "context": context or {}, "timestamp": datetime.now().isoformat() } try: async with aiohttp.ClientSession() as session: async with session.post( self.error_service_url, json=error_data ) as response: if response.status != 200: self.logger.error( f"Failed to report error: {response.status}" ) except Exception as e: self.logger.error(f"Failed to report error: {e}")

3. Retry Mechanism

python
import asyncio from typing import Callable, Optional, Type import time class RetryStrategy: """Retry strategy base class""" async def should_retry( self, attempt: int, error: Exception ) -> bool: """Determine if should retry""" raise NotImplementedError async def get_delay(self, attempt: int) -> float: """Get retry delay""" raise NotImplementedError class FixedDelayRetry(RetryStrategy): """Fixed delay retry""" def __init__(self, max_attempts: int = 3, delay: float = 1.0): self.max_attempts = max_attempts self.delay = delay async def should_retry( self, attempt: int, error: Exception ) -> bool: """Determine if should retry""" if attempt >= self.max_attempts: return False if isinstance(error, MCPError): return error.retryable return True async def get_delay(self, attempt: int) -> float: """Get retry delay""" return self.delay class ExponentialBackoffRetry(RetryStrategy): """Exponential backoff retry""" def __init__( self, max_attempts: int = 5, initial_delay: float = 1.0, max_delay: float = 60.0, backoff_factor: float = 2.0 ): self.max_attempts = max_attempts self.initial_delay = initial_delay self.max_delay = max_delay self.backoff_factor = backoff_factor async def should_retry( self, attempt: int, error: Exception ) -> bool: """Determine if should retry""" if attempt >= self.max_attempts: return False if isinstance(error, MCPError): return error.retryable return True async def get_delay(self, attempt: int) -> float: """Get retry delay""" delay = self.initial_delay * (self.backoff_factor ** attempt) return min(delay, self.max_delay) class RetryManager: """Retry manager""" def __init__(self, retry_strategy: RetryStrategy): self.retry_strategy = retry_strategy async def execute_with_retry( self, func: Callable, *args, **kwargs ) -> Any: """Execute function with retry""" attempt = 0 last_error = None while True: attempt += 1 try: result = await func(*args, **kwargs) return result except Exception as error: last_error = error # Determine if should retry should_retry = await self.retry_strategy.should_retry( attempt, error ) if not should_retry: raise error # Get retry delay delay = await self.retry_strategy.get_delay(attempt) # Wait and retry await asyncio.sleep(delay) raise last_error # Retry decorator def retry( max_attempts: int = 3, delay: float = 1.0, backoff_factor: float = 2.0, max_delay: float = 60.0 ): """Retry decorator""" def decorator(func: Callable): retry_strategy = ExponentialBackoffRetry( max_attempts=max_attempts, initial_delay=delay, max_delay=max_delay, backoff_factor=backoff_factor ) retry_manager = RetryManager(retry_strategy) @wraps(func) async def wrapper(*args, **kwargs): return await retry_manager.execute_with_retry( func, *args, **kwargs ) return wrapper return decorator

4. Circuit Breaker Mechanism

python
from enum import Enum from typing import Callable, Optional import asyncio class CircuitState(Enum): """Circuit breaker state""" CLOSED = "closed" # Normal state OPEN = "open" # Circuit broken state HALF_OPEN = "half_open" # Half-open state class CircuitBreaker: """Circuit breaker""" def __init__( self, failure_threshold: int = 5, success_threshold: int = 2, timeout: float = 60.0 ): self.failure_threshold = failure_threshold self.success_threshold = success_threshold self.timeout = timeout self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time: Optional[float] = None self.lock = asyncio.Lock() async def execute( self, func: Callable, *args, **kwargs ) -> Any: """Execute function""" async with self.lock: # Check circuit breaker state if self.state == CircuitState.OPEN: # Check if should try to recover if time.time() - self.last_failure_time > self.timeout: self.state = CircuitState.HALF_OPEN self.success_count = 0 else: raise MCPError( error_type=ErrorType.INTERNAL_ERROR, message="Circuit breaker is OPEN", code=503, retryable=True ) try: result = await func(*args, **kwargs) # Successful execution async with self.lock: if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.success_threshold: self.state = CircuitState.CLOSED self.failure_count = 0 elif self.state == CircuitState.CLOSED: self.failure_count = 0 return result except Exception as error: # Execution failed async with self.lock: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN raise error def get_state(self) -> CircuitState: """Get circuit breaker state""" return self.state def reset(self): """Reset circuit breaker""" async with self.lock: self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time = None # Circuit breaker decorator def circuit_breaker( failure_threshold: int = 5, success_threshold: int = 2, timeout: float = 60.0 ): """Circuit breaker decorator""" def decorator(func: Callable): breaker = CircuitBreaker( failure_threshold=failure_threshold, success_threshold=success_threshold, timeout=timeout ) @wraps(func) async def wrapper(*args, **kwargs): return await breaker.execute(func, *args, **kwargs) return wrapper return decorator

5. Fallback Strategy

python
from typing import Callable, Optional, Dict, Any import asyncio class FallbackStrategy: """Fallback strategy base class""" async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """Execute fallback logic""" raise NotImplementedError class CacheFallback(FallbackStrategy): """Cache fallback""" def __init__(self, cache: Dict[str, Any]): self.cache = cache async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """Get data from cache""" cache_key = context.get("cache_key") if context else None if cache_key and cache_key in self.cache: return self.cache[cache_key] raise error class DefaultFallback(FallbackStrategy): """Default value fallback""" def __init__(self, default_value: Any): self.default_value = default_value async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """Return default value""" return self.default_value class FallbackManager: """Fallback manager""" def __init__(self): self.fallback_strategies: Dict[ErrorType, FallbackStrategy] = {} self.default_fallback: Optional[FallbackStrategy] = None def register_fallback( self, error_type: ErrorType, fallback: FallbackStrategy ): """Register fallback strategy""" self.fallback_strategies[error_type] = fallback def set_default_fallback(self, fallback: FallbackStrategy): """Set default fallback strategy""" self.default_fallback = fallback async def execute_with_fallback( self, func: Callable, context: Dict[str, Any] = None, *args, **kwargs ) -> Any: """Execute function with fallback""" try: return await func(*args, **kwargs) except Exception as error: # Convert to MCP error if not isinstance(error, MCPError): error = InternalError(str(error)) # Find corresponding fallback strategy fallback = self.fallback_strategies.get( error.error_type, self.default_fallback ) if fallback: try: return await fallback.execute_fallback(error, context) except Exception as fallback_error: raise fallback_error raise error # Fallback decorator def fallback( error_type: ErrorType = None, default_value: Any = None ): """Fallback decorator""" def decorator(func: Callable): fallback_manager = FallbackManager() if error_type and default_value is not None: fallback_manager.register_fallback( error_type, DefaultFallback(default_value) ) @wraps(func) async def wrapper(*args, **kwargs): return await fallback_manager.execute_with_fallback( func, None, *args, **kwargs ) return wrapper return decorator

6. Comprehensive Error Handling Example

python
from mcp.server import Server class RobustMCPServer(Server): """Robust MCP server""" def __init__(self, name: str): super().__init__(name) # Initialize error handling components self.error_handler = ErrorHandler() self.retry_manager = RetryManager(ExponentialBackoffRetry()) self.circuit_breaker = CircuitBreaker() self.fallback_manager = FallbackManager() # Configure error handling self._setup_error_handling() def _setup_error_handling(self): """Setup error handling""" # Register error handlers self.error_handler.register_handler( ErrorType.VALIDATION_ERROR, self._handle_validation_error ) self.error_handler.register_handler( ErrorType.RATE_LIMIT_ERROR, self._handle_rate_limit_error ) # Register fallback strategies self.fallback_manager.register_fallback( ErrorType.EXTERNAL_SERVICE_ERROR, CacheFallback({}) ) async def _handle_validation_error( self, error: ValidationError, context: Dict[str, Any] ) -> Dict[str, Any]: """Handle validation error""" return { "error": error.to_dict(), "suggestions": self._get_validation_suggestions(error.details) } async def _handle_rate_limit_error( self, error: RateLimitError, context: Dict[str, Any] ) -> Dict[str, Any]: """Handle rate limit error""" retry_after = error.details.get("retry_after", 60) return { "error": error.to_dict(), "retry_after": retry_after, "message": f"Please wait {retry_after} seconds before retrying" } def _get_validation_suggestions( self, details: Dict[str, Any] ) -> List[str]: """Get validation suggestions""" suggestions = [] # Provide suggestions based on error details # ... return suggestions @retry(max_attempts=3, delay=1.0) @circuit_breaker(failure_threshold=5, timeout=60.0) @fallback(error_type=ErrorType.EXTERNAL_SERVICE_ERROR, default_value={}) async def call_external_service( self, service_url: str, params: Dict[str, Any] ) -> Dict[str, Any]: """Call external service""" try: # Call external service # ... pass except Exception as error: # Convert to MCP error raise ExternalServiceError("external", str(error))

Best Practices:

  1. Error Classification: Properly classify error types for targeted handling
  2. Retry Strategy: Choose appropriate retry strategy based on error type
  3. Circuit Breaker: Prevent cascading failures and protect system stability
  4. Fallback Strategy: Provide degraded services during failures to ensure basic functionality
  5. Error Logging: Log detailed error information for troubleshooting
  6. Monitoring and Alerting: Monitor error rates and detect issues in time

Through comprehensive error handling and retry mechanisms, you can ensure MCP system stability and reliability.

标签:MCP