乐闻世界logo
搜索文章和话题

面试题手册

如何在 MCP 中管理和使用提示词(Prompts)?

MCP 的提示词(Prompts)管理功能允许预定义和管理可重复使用的提示词模板,提高开发效率和一致性。以下是详细的实现方法:提示词定义MCP 提示词包含名称、描述和参数化模板:{ "name": "prompt_name", "description": "提示词描述", "arguments": [ { "name": "param1", "description": "参数1的描述", "required": true } ]}1. 提示词注册from mcp.server import Serverserver = Server("my-mcp-server")@server.prompt( name="code_review", description="代码审查提示词模板")async def code_review_prompt( code: str, language: str = "python") -> str: """生成代码审查提示词""" return f""" 请审查以下 {language} 代码,并提供改进建议: 代码: {code} 请关注以下方面: 1. 代码质量和可读性 2. 性能优化建议 3. 安全性问题 4. 最佳实践 5. 潜在的 bug """@server.prompt( name="data_analysis", description="数据分析提示词模板")async def data_analysis_prompt( data: str, analysis_type: str = "summary") -> str: """生成数据分析提示词""" return f""" 请对以下数据进行 {analysis_type} 分析: 数据: {data} 请提供: 1. 数据摘要 2. 关键洞察 3. 趋势分析 4. 可视化建议 """2. 提示词模板引擎from string import Templateimport jsonfrom typing import Dict, Anyclass PromptTemplateEngine: def __init__(self): self.templates = {} def register_template(self, name: str, template: str, parameters: list): """注册提示词模板""" self.templates[name] = { "template": template, "parameters": parameters } def render(self, name: str, **kwargs) -> str: """渲染提示词模板""" if name not in self.templates: raise ValueError(f"模板 '{name}' 不存在") template_info = self.templates[name] template = template_info["template"] # 验证必需参数 required_params = [ p["name"] for p in template_info["parameters"] if p.get("required", False) ] missing_params = [ param for param in required_params if param not in kwargs ] if missing_params: raise ValueError( f"缺少必需参数: {', '.join(missing_params)}" ) # 使用 Template 渲染 t = Template(template) return t.substitute(**kwargs) def list_templates(self) -> list: """列出所有模板""" return [ { "name": name, "description": info.get("description", ""), "parameters": info["parameters"] } for name, info in self.templates.items() ]3. 提示词版本管理from datetime import datetimefrom typing import Optionalclass PromptVersionManager: def __init__(self): self.versions = {} self.current_versions = {} def save_version( self, prompt_name: str, version: str, template: str, metadata: dict = None ): """保存提示词版本""" if prompt_name not in self.versions: self.versions[prompt_name] = {} self.versions[prompt_name][version] = { "template": template, "metadata": metadata or {}, "created_at": datetime.now().isoformat() } def set_current_version(self, prompt_name: str, version: str): """设置当前版本""" if prompt_name not in self.versions or \ version not in self.versions[prompt_name]: raise ValueError(f"版本 '{version}' 不存在") self.current_versions[prompt_name] = version def get_current_version(self, prompt_name: str) -> Optional[str]: """获取当前版本""" return self.current_versions.get(prompt_name) def get_version( self, prompt_name: str, version: str ) -> Optional[dict]: """获取指定版本""" if prompt_name not in self.versions: return None return self.versions[prompt_name].get(version) def list_versions(self, prompt_name: str) -> list: """列出所有版本""" if prompt_name not in self.versions: return [] return list(self.versions[prompt_name].keys())4. 提示词缓存from functools import lru_cacheimport hashlibclass PromptCache: def __init__(self, max_size: int = 1000): self.cache = {} self.max_size = max_size def _generate_key( self, prompt_name: str, params: dict ) -> str: """生成缓存键""" param_str = json.dumps(params, sort_keys=True) combined = f"{prompt_name}:{param_str}" return hashlib.md5(combined.encode()).hexdigest() def get(self, prompt_name: str, params: dict) -> Optional[str]: """获取缓存的提示词""" key = self._generate_key(prompt_name, params) return self.cache.get(key) def set(self, prompt_name: str, params: dict, prompt: str): """设置缓存""" # 如果缓存已满,删除最旧的条目 if len(self.cache) >= self.max_size: oldest_key = next(iter(self.cache)) del self.cache[oldest_key] key = self._generate_key(prompt_name, params) self.cache[key] = prompt def clear(self): """清空缓存""" self.cache.clear() def invalidate(self, prompt_name: str): """使指定提示词的缓存失效""" keys_to_remove = [ key for key in self.cache if key.startswith(hashlib.md5(prompt_name.encode()).hexdigest()[:16]) ] for key in keys_to_remove: del self.cache[key]5. 提示词测试和验证class PromptTester: def __init__(self, template_engine: PromptTemplateEngine): self.engine = template_engine def test_template( self, template_name: str, test_cases: list ) -> dict: """测试提示词模板""" results = { "template": template_name, "total_tests": len(test_cases), "passed": 0, "failed": 0, "errors": [] } for i, test_case in enumerate(test_cases): try: # 渲染提示词 prompt = self.engine.render( template_name, **test_case["params"] ) # 验证输出 if "expected_contains" in test_case: for expected in test_case["expected_contains"]: if expected not in prompt: results["errors"].append({ "test": i, "error": f"期望包含 '{expected}' 但未找到" }) results["failed"] += 1 break else: results["passed"] += 1 else: results["passed"] += 1 except Exception as e: results["errors"].append({ "test": i, "error": str(e) }) results["failed"] += 1 return results def validate_parameters( self, template_name: str, params: dict ) -> tuple[bool, str]: """验证参数""" try: # 尝试渲染提示词 self.engine.render(template_name, **params) return True, "" except Exception as e: return False, str(e)6. 提示词分析和优化import refrom collections import Counterclass PromptAnalyzer: def analyze(self, prompt: str) -> dict: """分析提示词""" return { "length": len(prompt), "word_count": len(prompt.split()), "sentence_count": len(re.split(r'[.!?]+', prompt)), "token_estimate": self._estimate_tokens(prompt), "complexity": self._calculate_complexity(prompt), "variables": self._extract_variables(prompt) } def _estimate_tokens(self, text: str) -> int: """估算 token 数量""" # 粗略估算:英文约 4 字符/token,中文约 1.5 字符/token chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) english_chars = len(re.findall(r'[a-zA-Z]', text)) return int(chinese_chars / 1.5 + english_chars / 4) def _calculate_complexity(self, prompt: str) -> float: """计算复杂度""" words = prompt.split() unique_words = set(words) # 词汇多样性 diversity = len(unique_words) / len(words) if words else 0 # 平均句子长度 sentences = re.split(r'[.!?]+', prompt) avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0 return (diversity + min(avg_sentence_length / 20, 1)) / 2 def _extract_variables(self, prompt: str) -> list: """提取变量""" return re.findall(r'\{(\w+)\}', prompt)最佳实践:参数化设计:使用参数化模板提高复用性版本控制:对重要提示词实施版本管理缓存策略:缓存渲染结果提高性能测试覆盖:编写测试用例确保提示词质量性能监控:监控提示词的 token 使用和性能文档完善:为每个提示词提供清晰的文档通过完善的提示词管理机制,可以提高 MCP 系统的开发效率和一致性。
阅读 0·2月19日 21:38

MCP 如何支持多租户架构?

MCP 的多租户支持对于企业级应用至关重要,它允许在单一 MCP 服务器实例中为多个客户或组织提供隔离的服务。以下是详细的实现方法:多租户架构设计MCP 多租户应考虑以下方面:数据隔离:确保不同租户的数据完全隔离资源隔离:隔离计算资源和配额安全隔离:实现租户级别的认证和授权性能隔离:防止单个租户影响其他租户1. 租户识别和上下文from typing import Optionalfrom dataclasses import dataclass@dataclassclass TenantContext: """租户上下文""" tenant_id: str tenant_name: str user_id: str permissions: list quotas: dictclass TenantContextManager: def __init__(self): self.contexts = {} def create_context( self, tenant_id: str, tenant_name: str, user_id: str, permissions: list, quotas: dict = None ) -> TenantContext: """创建租户上下文""" context = TenantContext( tenant_id=tenant_id, tenant_name=tenant_name, user_id=user_id, permissions=permissions, quotas=quotas or self._get_default_quotas(tenant_id) ) self.contexts[tenant_id] = context return context def get_context(self, tenant_id: str) -> Optional[TenantContext]: """获取租户上下文""" return self.contexts.get(tenant_id) def set_current_context(self, tenant_id: str): """设置当前租户上下文""" context = self.get_context(tenant_id) if not context: raise ValueError(f"租户 {tenant_id} 不存在") # 使用线程本地存储或异步上下文变量 import contextvars current_tenant.set(context) def _get_default_quotas(self, tenant_id: str) -> dict: """获取默认配额""" return { "max_tools": 100, "max_resources": 1000, "max_requests_per_minute": 1000, "max_storage_mb": 1024 }# 当前租户上下文变量current_tenant = contextvars.ContextVar('current_tenant', default=None)2. 数据隔离from sqlalchemy import create_engine, Column, String, Integer, Textfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, scoped_sessionBase = declarative_base()class TenantData(Base): """租户数据表""" __tablename__ = 'tenant_data' id = Column(Integer, primary_key=True) tenant_id = Column(String(50), nullable=False, index=True) data_key = Column(String(100), nullable=False) data_value = Column(Text) __table_args__ = ( # 确保租户隔离 Index('idx_tenant_key', 'tenant_id', 'data_key', unique=True), )class MultiTenantDatabase: def __init__(self, database_url: str): self.engine = create_engine(database_url) Base.metadata.create_all(self.engine) self.SessionLocal = scoped_session( sessionmaker(autocommit=False, autoflush=False, bind=self.engine) ) def get_session(self, tenant_id: str): """获取租户专属的数据库会话""" session = self.SessionLocal() # 添加租户过滤器 from sqlalchemy import event @event.listens_for(session, 'before_flush') def add_tenant_filter(session, context, instances): for instance in session.new: if hasattr(instance, 'tenant_id'): instance.tenant_id = tenant_id return session def query_tenant_data( self, tenant_id: str, data_key: str ) -> Optional[str]: """查询租户数据""" session = self.get_session(tenant_id) try: result = session.query(TenantData).filter( TenantData.tenant_id == tenant_id, TenantData.data_key == data_key ).first() return result.data_value if result else None finally: session.close() def save_tenant_data( self, tenant_id: str, data_key: str, data_value: str ): """保存租户数据""" session = self.get_session(tenant_id) try: existing = session.query(TenantData).filter( TenantData.tenant_id == tenant_id, TenantData.data_key == data_key ).first() if existing: existing.data_value = data_value else: new_data = TenantData( tenant_id=tenant_id, data_key=data_key, data_value=data_value ) session.add(new_data) session.commit() except Exception as e: session.rollback() raise e finally: session.close()3. 资源配额管理from collections import defaultdictimport timeclass QuotaManager: def __init__(self): self.quotas = {} self.usage = defaultdict(lambda: defaultdict(int)) self.rate_limits = {} def set_quota( self, tenant_id: str, quota_type: str, limit: int ): """设置租户配额""" if tenant_id not in self.quotas: self.quotas[tenant_id] = {} self.quotas[tenant_id][quota_type] = limit def check_quota( self, tenant_id: str, quota_type: str, amount: int = 1 ) -> bool: """检查配额是否足够""" if tenant_id not in self.quotas: return True limit = self.quotas[tenant_id].get(quota_type) if limit is None: return True current_usage = self.usage[tenant_id][quota_type] return current_usage + amount <= limit def consume_quota( self, tenant_id: str, quota_type: str, amount: int = 1 ) -> bool: """消耗配额""" if not self.check_quota(tenant_id, quota_type, amount): return False self.usage[tenant_id][quota_type] += amount return True def get_usage( self, tenant_id: str, quota_type: str ) -> int: """获取使用量""" return self.usage[tenant_id][quota_type] def reset_usage(self, tenant_id: str): """重置使用量""" if tenant_id in self.usage: self.usage[tenant_id].clear() def check_rate_limit( self, tenant_id: str, window: int = 60, max_requests: int = 100 ) -> bool: """检查速率限制""" now = time.time() if tenant_id not in self.rate_limits: self.rate_limits[tenant_id] = [] # 清理过期的请求记录 self.rate_limits[tenant_id] = [ timestamp for timestamp in self.rate_limits[tenant_id] if now - timestamp < window ] # 检查是否超过限制 if len(self.rate_limits[tenant_id]) >= max_requests: return False # 记录新请求 self.rate_limits[tenant_id].append(now) return True4. 租户级别的工具和资源from mcp.server import Serverfrom functools import wrapsclass MultiTenantServer(Server): def __init__(self, name: str, tenant_manager: TenantContextManager): super().__init__(name) self.tenant_manager = tenant_manager self.tenant_tools = defaultdict(dict) self.tenant_resources = defaultdict(dict) def tenant_tool( self, name: str, description: str, tenant_id: str = None ): """租户专属工具装饰器""" def decorator(func): # 注册工具 self.tenant_tools[tenant_id or "default"][name] = { "function": func, "description": description } @wraps(func) async def wrapper(*args, **kwargs): # 获取当前租户 context = current_tenant.get() if not context: raise PermissionError("未找到租户上下文") # 检查租户权限 if tenant_id and context.tenant_id != tenant_id: raise PermissionError("无权访问此工具") # 执行工具 return await func(*args, **kwargs) return wrapper return decorator def tenant_resource( self, uri: str, name: str, description: str, tenant_id: str = None ): """租户专属资源装饰器""" def decorator(func): # 注册资源 self.tenant_resources[tenant_id or "default"][uri] = { "function": func, "name": name, "description": description } @wraps(func) async def wrapper(*args, **kwargs): # 获取当前租户 context = current_tenant.get() if not context: raise PermissionError("未找到租户上下文") # 检查租户权限 if tenant_id and context.tenant_id != tenant_id: raise PermissionError("无权访问此资源") # 执行资源 return await func(*args, **kwargs) return wrapper return decorator async def list_tools(self, tenant_id: str = None) -> list: """列出可用工具""" context = current_tenant.get() if not context: return [] # 获取默认工具和租户专属工具 tools = [] # 添加默认工具 for name, tool_info in self.tenant_tools["default"].items(): tools.append({ "name": name, "description": tool_info["description"] }) # 添加租户专属工具 if context.tenant_id in self.tenant_tools: for name, tool_info in self.tenant_tools[context.tenant_id].items(): tools.append({ "name": name, "description": tool_info["description"] }) return tools5. 租户认证和授权import jwtfrom datetime import datetime, timedeltafrom typing import Dict, Anyclass TenantAuthenticator: def __init__(self, secret_key: str): self.secret_key = secret_key def generate_token( self, tenant_id: str, user_id: str, permissions: list, expires_in: int = 3600 ) -> str: """生成租户令牌""" payload = { "tenant_id": tenant_id, "user_id": user_id, "permissions": permissions, "exp": datetime.utcnow() + timedelta(seconds=expires_in), "iat": datetime.utcnow() } token = jwt.encode(payload, self.secret_key, algorithm="HS256") return token def verify_token(self, token: str) -> Dict[str, Any]: """验证租户令牌""" try: payload = jwt.decode(token, self.secret_key, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise ValueError("令牌已过期") except jwt.InvalidTokenError: raise ValueError("无效的令牌") def check_permission( self, token: str, required_permission: str ) -> bool: """检查权限""" payload = self.verify_token(token) permissions = payload.get("permissions", []) return required_permission in permissions or "admin" in permissions6. 租户监控和报告from collections import defaultdictfrom datetime import datetime, timedeltaclass TenantMonitor: def __init__(self): self.metrics = defaultdict(lambda: defaultdict(list)) def record_metric( self, tenant_id: str, metric_name: str, value: float ): """记录指标""" timestamp = datetime.now() self.metrics[tenant_id][metric_name].append({ "value": value, "timestamp": timestamp }) # 限制历史记录大小 if len(self.metrics[tenant_id][metric_name]) > 1000: self.metrics[tenant_id][metric_name] = \ self.metrics[tenant_id][metric_name][-1000:] def get_metrics( self, tenant_id: str, metric_name: str, since: datetime = None ) -> list: """获取指标""" if tenant_id not in self.metrics: return [] if metric_name not in self.metrics[tenant_id]: return [] records = self.metrics[tenant_id][metric_name] if since: records = [ record for record in records if record["timestamp"] >= since ] return records def get_aggregated_metrics( self, tenant_id: str, metric_name: str, since: datetime = None ) -> dict: """获取聚合指标""" records = self.get_metrics(tenant_id, metric_name, since) if not records: return {} values = [record["value"] for record in records] return { "count": len(values), "sum": sum(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values) } def generate_tenant_report( self, tenant_id: str, since: datetime = None ) -> dict: """生成租户报告""" if not since: since = datetime.now() - timedelta(days=7) report = { "tenant_id": tenant_id, "period": { "start": since, "end": datetime.now() }, "metrics": {} } if tenant_id in self.metrics: for metric_name in self.metrics[tenant_id]: report["metrics"][metric_name] = \ self.get_aggregated_metrics(tenant_id, metric_name, since) return report最佳实践:数据隔离:使用租户 ID 作为所有数据表的主键或索引配额管理:为每个租户设置合理的资源配额权限控制:实施细粒度的租户级权限控制性能监控:监控每个租户的资源使用情况安全审计:记录所有租户操作用于审计弹性扩展:根据租户需求动态扩展资源通过完善的多租户支持,可以在单一 MCP 服务器实例中为多个客户或组织提供隔离、安全、高效的服务。
阅读 0·2月19日 21:38

MCP 的消息格式是怎样的?有哪些常用的消息类型?

MCP 的消息格式基于 JSON-RPC 2.0 协议,并进行了扩展以支持 AI 模型与外部系统的交互。以下是详细的消息格式说明:基础消息结构所有 MCP 消息都遵循 JSON-RPC 2.0 的基本格式:{ "jsonrpc": "2.0", "id": "unique-request-id", "method": "method-name", "params": { ... }}1. 请求消息(Request)用于客户端向服务器发送工具调用请求:{ "jsonrpc": "2.0", "id": "req-123", "method": "tools/call", "params": { "name": "calculate", "arguments": { "expression": "2 + 2" } }}2. 响应消息(Response)服务器返回执行结果:{ "jsonrpc": "2.0", "id": "req-123", "result": { "content": [ { "type": "text", "text": "结果: 4" } ] }}3. 错误响应(Error Response)当请求失败时返回:{ "jsonrpc": "2.0", "id": "req-123", "error": { "code": -32602, "message": "Invalid params", "data": { "details": "参数 'expression' 不能为空" } }}4. 通知消息(Notification)服务器主动推送的消息(无需响应):{ "jsonrpc": "2.0", "method": "notifications/progress", "params": { "progress": 0.5, "message": "处理中..." }}常用方法类型tools/list - 获取可用工具列表{ "jsonrpc": "2.0", "id": "req-001", "method": "tools/list"}resources/list - 获取可用资源列表{ "jsonrpc": "2.0", "id": "req-002", "method": "resources/list"}resources/read - 读取资源内容{ "jsonrpc": "2.0", "id": "req-003", "method": "resources/read", "params": { "uri": "file:///data/config.json" }}prompts/list - 获取提示词列表{ "jsonrpc": "2.0", "id": "req-004", "method": "prompts/list"}错误代码MCP 定义了标准的错误代码:| 代码 | 名称 | 描述 ||------|------|------|| -32700 | Parse error | JSON 解析错误 || -32600 | Invalid Request | 无效的请求 || -32601 | Method not found | 方法不存在 || -32602 | Invalid params | 无效的参数 || -32603 | Internal error | 内部错误 || -32000 | Server error | 服务器错误 |内容类型(Content Types)MCP 支持多种内容类型:{ "type": "text", "text": "纯文本内容"}{ "type": "image", "data": "base64-encoded-image-data", "mimeType": "image/png"}{ "type": "resource", "uri": "file:///data/report.pdf", "mimeType": "application/pdf"}消息流(Message Streaming)对于长时间运行的操作,支持流式响应:{ "jsonrpc": "2.0", "id": "req-005", "method": "tools/call", "params": { "name": "generate_report", "arguments": { "stream": true } }}最佳实践:唯一 ID:每个请求必须有唯一的 ID类型验证:严格验证参数类型和格式错误处理:提供详细的错误信息和数据超时处理:实现请求超时机制日志记录:记录所有消息用于调试和审计理解 MCP 的消息格式对于实现兼容的服务器和客户端至关重要。
阅读 0·2月19日 21:35

如何部署和运维 MCP 系统?有哪些最佳实践?

MCP 的部署和运维对于生产环境的稳定运行至关重要。以下是详细的部署策略和运维最佳实践:部署架构MCP 可以采用多种部署架构:单机部署:适合开发和测试环境容器化部署:使用 Docker 容器Kubernetes 部署:适合大规模生产环境无服务器部署:使用 AWS Lambda、Azure Functions 等1. Docker 容器化部署# DockerfileFROM python:3.11-slimWORKDIR /app# 安装依赖COPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt# 复制应用代码COPY . .# 暴露端口EXPOSE 8000# 健康检查HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1# 启动应用CMD ["python", "-m", "mcp.server", "--host", "0.0.0.0", "--port", "8000"]# docker-compose.ymlversion: '3.8'services: mcp-server: build: . ports: - "8000:8000" environment: - MCP_HOST=0.0.0.0 - MCP_PORT=8000 - LOG_LEVEL=info - DATABASE_URL=postgresql://user:pass@db:5432/mcp volumes: - ./config:/app/config - ./logs:/app/logs depends_on: - db - redis restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s timeout: 10s retries: 3 db: image: postgres:15 environment: - POSTGRES_DB=mcp - POSTGRES_USER=user - POSTGRES_PASSWORD=pass volumes: - postgres_data:/var/lib/postgresql/data restart: unless-stopped redis: image: redis:7-alpine volumes: - redis_data:/data restart: unless-stoppedvolumes: postgres_data: redis_data:2. Kubernetes 部署# deployment.yamlapiVersion: apps/v1kind: Deploymentmetadata: name: mcp-server labels: app: mcp-serverspec: replicas: 3 selector: matchLabels: app: mcp-server template: metadata: labels: app: mcp-server spec: containers: - name: mcp-server image: your-registry/mcp-server:latest ports: - containerPort: 8000 env: - name: MCP_HOST value: "0.0.0.0" - name: MCP_PORT value: "8000" - name: DATABASE_URL valueFrom: secretKeyRef: name: mcp-secrets key: database-url resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8000 initialDelaySeconds: 5 periodSeconds: 5---apiVersion: v1kind: Servicemetadata: name: mcp-serverspec: selector: app: mcp-server ports: - protocol: TCP port: 80 targetPort: 8000 type: LoadBalancer---apiVersion: autoscaling/v2kind: HorizontalPodAutoscalermetadata: name: mcp-server-hpaspec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: mcp-server minReplicas: 3 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 803. CI/CD 流水线# .github/workflows/deploy.ymlname: Deploy MCP Serveron: push: branches: [main] pull_request: branches: [main]jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install dependencies run: | pip install -r requirements.txt pip install pytest pytest-cov - name: Run tests run: | pytest --cov=mcp --cov-report=xml - name: Upload coverage uses: codecov/codecov-action@v3 build: needs: test runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Build Docker image run: | docker build -t mcp-server:${{ github.sha }} . - name: Push to registry run: | echo ${{ secrets.DOCKER_PASSWORD }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin docker tag mcp-server:${{ github.sha }} your-registry/mcp-server:latest docker push your-registry/mcp-server:latest deploy: needs: build runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - name: Deploy to Kubernetes uses: azure/k8s-deploy@v4 with: manifests: | k8s/deployment.yaml images: | your-registry/mcp-server:latest kubeconfig: ${{ secrets.KUBE_CONFIG }}4. 监控和日志# monitoring.pyfrom prometheus_client import Counter, Histogram, Gauge, start_http_serverimport loggingfrom logging.handlers import RotatingFileHandler# Prometheus 指标REQUEST_COUNT = Counter('mcp_requests_total', 'Total requests', ['method', 'endpoint'])REQUEST_DURATION = Histogram('mcp_request_duration_seconds', 'Request duration')ACTIVE_CONNECTIONS = Gauge('mcp_active_connections', 'Active connections')ERROR_COUNT = Counter('mcp_errors_total', 'Total errors', ['error_type'])# 日志配置def setup_logging(): logger = logging.getLogger('mcp') logger.setLevel(logging.INFO) # 文件处理器 file_handler = RotatingFileHandler( 'logs/mcp.log', maxBytes=10*1024*1024, # 10MB backupCount=5 ) file_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter( logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') ) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger# 启动监控服务器def start_metrics_server(port: int = 9090): start_http_server(port) logging.info(f"Metrics server started on port {port}")5. 配置管理# config.pyimport osfrom pydantic import BaseSettings, Fieldclass MCPSettings(BaseSettings): # 服务器配置 host: str = Field(default="0.0.0.0", env="MCP_HOST") port: int = Field(default=8000, env="MCP_PORT") # 数据库配置 database_url: str = Field(..., env="DATABASE_URL") database_pool_size: int = Field(default=10, env="DATABASE_POOL_SIZE") # Redis 配置 redis_url: str = Field(default="redis://localhost:6379", env="REDIS_URL") # 日志配置 log_level: str = Field(default="INFO", env="LOG_LEVEL") log_file: str = Field(default="logs/mcp.log", env="LOG_FILE") # 安全配置 secret_key: str = Field(..., env="SECRET_KEY") jwt_algorithm: str = Field(default="HS256", env="JWT_ALGORITHM") # 性能配置 max_connections: int = Field(default=100, env="MAX_CONNECTIONS") request_timeout: int = Field(default=30, env="REQUEST_TIMEOUT") # 缓存配置 cache_ttl: int = Field(default=3600, env="CACHE_TTL") class Config: env_file = ".env" case_sensitive = False# 加载配置settings = MCPSettings()6. 备份和恢复#!/bin/bash# backup.sh# 数据库备份backup_database() { echo "Backing up database..." pg_dump $DATABASE_URL > backups/db_$(date +%Y%m%d_%H%M%S).sql echo "Database backup completed"}# 配置备份backup_config() { echo "Backing up configuration..." tar -czf backups/config_$(date +%Y%m%d_%H%M%S).tar.gz config/ echo "Configuration backup completed"}# 日志备份backup_logs() { echo "Backing up logs..." tar -czf backups/logs_$(date +%Y%m%d_%H%M%S).tar.gz logs/ echo "Logs backup completed"}# 清理旧备份cleanup_old_backups() { echo "Cleaning up old backups (older than 7 days)..." find backups/ -name "*.sql" -mtime +7 -delete find backups/ -name "*.tar.gz" -mtime +7 -delete echo "Cleanup completed"}# 主函数main() { mkdir -p backups backup_database backup_config backup_logs cleanup_old_backups echo "All backups completed successfully"}main7. 故障排查# diagnostics.pyimport psutilimport asynciofrom typing import Dict, Anyclass SystemDiagnostics: @staticmethod def get_system_info() -> Dict[str, Any]: """获取系统信息""" return { "cpu_percent": psutil.cpu_percent(interval=1), "memory": { "total": psutil.virtual_memory().total, "available": psutil.virtual_memory().available, "percent": psutil.virtual_memory().percent }, "disk": { "total": psutil.disk_usage('/').total, "used": psutil.disk_usage('/').used, "percent": psutil.disk_usage('/').percent }, "network": { "connections": len(psutil.net_connections()), "io_counters": psutil.net_io_counters()._asdict() } } @staticmethod async def check_database_connection(db_url: str) -> bool: """检查数据库连接""" try: # 实现数据库连接检查 return True except Exception as e: logging.error(f"Database connection failed: {e}") return False @staticmethod async def check_redis_connection(redis_url: str) -> bool: """检查 Redis 连接""" try: # 实现 Redis 连接检查 return True except Exception as e: logging.error(f"Redis connection failed: {e}") return False @staticmethod def get_service_status() -> Dict[str, bool]: """获取服务状态""" return { "database": asyncio.run(SystemDiagnostics.check_database_connection(settings.database_url)), "redis": asyncio.run(SystemDiagnostics.check_redis_connection(settings.redis_url)), "api": True # 如果能运行到这里,API 服务是正常的 }最佳实践:容器化:使用 Docker 容器确保环境一致性自动化部署:使用 CI/CD 自动化部署流程监控告警:实施全面的监控和告警机制日志集中:集中管理日志,便于分析和排查备份策略:定期备份重要数据和配置灾难恢复:制定并测试灾难恢复计划安全加固:实施安全加固措施性能优化:持续监控和优化系统性能通过完善的部署和运维策略,可以确保 MCP 系统在生产环境中的稳定运行。
阅读 0·2月19日 21:35

MCP 中的资源管理是如何工作的?

MCP 的资源管理机制允许 LLM 访问和操作外部资源,如文件、数据库记录、API 端点等。以下是详细的实现方法:资源定义MCP 资源通过 URI(统一资源标识符)进行标识和访问:{ "uri": "file:///path/to/resource", "name": "资源名称", "description": "资源描述", "mimeType": "text/plain"}1. 资源类型MCP 支持多种资源类型:文件资源:file:///path/to/fileHTTP 资源:http://example.com/api/resource数据库资源:db://database/table/id自定义资源:custom://resource-type/id2. 资源注册from mcp.server import Serverfrom mcp.types import Resourceserver = Server("my-mcp-server")@server.resource( uri="file:///config/app.json", name="应用配置", description="应用程序的配置文件", mimeType="application/json")async def get_app_config() -> str: """获取应用配置""" return """ { "name": "MyApp", "version": "1.0.0", "settings": { "debug": false, "maxConnections": 100 } } """@server.resource( uri="db://users/{id}", name="用户信息", description="用户详细信息", mimeType="application/json")async def get_user(id: str) -> str: """获取用户信息""" user = await database.get_user(id) return json.dumps(user)3. 资源访问控制class ResourceAccessControl: def __init__(self): self.permissions = {} self.acl = {} def grant_permission(self, user: str, resource_pattern: str, access: str): """授予资源访问权限""" if user not in self.permissions: self.permissions[user] = [] self.permissions[user].append({ "pattern": resource_pattern, "access": access # "read", "write", "delete" }) def check_permission(self, user: str, resource_uri: str, access: str) -> bool: """检查访问权限""" if user not in self.permissions: return False for perm in self.permissions[user]: if self._match_pattern(perm["pattern"], resource_uri): if access in perm["access"] or perm["access"] == "all": return True return False def _match_pattern(self, pattern: str, uri: str) -> bool: """匹配资源模式""" import re # 将通配符转换为正则表达式 regex = pattern.replace("*", ".*").replace("?", ".") return re.match(regex, uri) is not None4. 资源缓存from functools import lru_cachefrom datetime import datetime, timedeltaimport hashlibclass ResourceCache: def __init__(self, ttl: int = 3600): self.cache = {} self.ttl = ttl def get(self, resource_uri: str) -> Optional[str]: """获取缓存资源""" if resource_uri not in self.cache: return None entry = self.cache[resource_uri] # 检查是否过期 if datetime.now() > entry["expires"]: del self.cache[resource_uri] return None return entry["data"] def set(self, resource_uri: str, data: str): """设置缓存资源""" self.cache[resource_uri] = { "data": data, "expires": datetime.now() + timedelta(seconds=self.ttl), "hash": hashlib.md5(data.encode()).hexdigest() } def invalidate(self, resource_uri: str): """使缓存失效""" if resource_uri in self.cache: del self.cache[resource_uri] def clear(self): """清空缓存""" self.cache.clear()5. 资源版本控制class ResourceVersionManager: def __init__(self): self.versions = {} def save_version(self, resource_uri: str, data: str, version: str): """保存资源版本""" if resource_uri not in self.versions: self.versions[resource_uri] = {} self.versions[resource_uri][version] = { "data": data, "timestamp": datetime.now().isoformat() } def get_version(self, resource_uri: str, version: str) -> Optional[str]: """获取指定版本""" if resource_uri not in self.versions: return None return self.versions[resource_uri].get(version, {}).get("data") def list_versions(self, resource_uri: str) -> List[str]: """列出所有版本""" if resource_uri not in self.versions: return [] return list(self.versions[resource_uri].keys()) def get_latest_version(self, resource_uri: str) -> Optional[str]: """获取最新版本""" versions = self.list_versions(resource_uri) if not versions: return None return max(versions)6. 资源监控class ResourceMonitor: def __init__(self): self.access_log = [] self.metrics = { "total_access": 0, "unique_resources": set(), "access_by_type": {} } def log_access(self, resource_uri: str, user: str, action: str): """记录资源访问""" log_entry = { "timestamp": datetime.now().isoformat(), "resource": resource_uri, "user": user, "action": action } self.access_log.append(log_entry) # 更新指标 self.metrics["total_access"] += 1 self.metrics["unique_resources"].add(resource_uri) # 按类型统计 resource_type = resource_uri.split("://")[0] if resource_type not in self.metrics["access_by_type"]: self.metrics["access_by_type"][resource_type] = 0 self.metrics["access_by_type"][resource_type] += 1 def get_metrics(self) -> dict: """获取监控指标""" return { "total_access": self.metrics["total_access"], "unique_resources": len(self.metrics["unique_resources"]), "access_by_type": self.metrics["access_by_type"] } def get_access_history(self, resource_uri: str, limit: int = 100) -> List[dict]: """获取访问历史""" filtered = [ log for log in self.access_log if log["resource"] == resource_uri ] return filtered[-limit:]7. 资源生命周期管理class ResourceLifecycleManager: def __init__(self): self.resources = {} self.cleanup_interval = 3600 # 1小时 def register_resource(self, resource_uri: str, metadata: dict): """注册资源""" self.resources[resource_uri] = { "metadata": metadata, "created_at": datetime.now(), "last_accessed": datetime.now(), "access_count": 0 } def access_resource(self, resource_uri: str): """访问资源""" if resource_uri in self.resources: self.resources[resource_uri]["last_accessed"] = datetime.now() self.resources[resource_uri]["access_count"] += 1 def cleanup_old_resources(self, max_age_days: int = 30): """清理旧资源""" cutoff = datetime.now() - timedelta(days=max_age_days) to_remove = [] for uri, info in self.resources.items(): if info["last_accessed"] < cutoff: to_remove.append(uri) for uri in to_remove: del self.resources[uri] return len(to_remove) def get_resource_stats(self) -> dict: """获取资源统计""" return { "total_resources": len(self.resources), "total_accesses": sum(r["access_count"] for r in self.resources.values()), "oldest_resource": min( (r["created_at"] for r in self.resources.values()), default=None ) }最佳实践:URI 设计:使用清晰、层次化的 URI 结构权限控制:实施最小权限原则缓存策略:根据资源特性设置合适的 TTL监控和日志:记录所有资源访问操作版本管理:对重要资源实施版本控制定期清理:清理不再使用的资源通过完善的资源管理机制,可以确保 MCP 系统中资源的安全、高效访问。
阅读 0·2月19日 21:35

如何在 MCP 中实现流式处理?

MCP 的流式处理能力允许实时传输大量数据或长时间运行的操作结果。以下是详细的实现方法:流式处理基础MCP 支持两种流式处理模式:服务器推送流:服务器主动推送数据客户端请求流:客户端请求流式响应1. 流式工具定义from mcp.server import Serverfrom mcp.types import Toolimport asyncioserver = Server("my-mcp-server")@server.tool( name="stream_data", description="流式返回大量数据")async def stream_data( count: int, batch_size: int = 10) -> AsyncIterator[str]: """流式生成数据""" for i in range(0, count, batch_size): batch = list(range(i, min(i + batch_size, count))) # 返回一批数据 yield { "type": "data", "batch": batch, "progress": (i + batch_size) / count } # 模拟处理延迟 await asyncio.sleep(0.1) # 发送完成信号 yield { "type": "done", "total": count }2. 流式响应处理器from typing import AsyncIterator, Dict, Anyimport jsonclass StreamProcessor: def __init__(self): self.active_streams = {} async def process_stream( self, stream_id: str, stream: AsyncIterator[Dict[str, Any]] ) -> AsyncIterator[Dict[str, Any]]: """处理流式响应""" self.active_streams[stream_id] = { "status": "active", "start_time": asyncio.get_event_loop().time() } try: async for chunk in stream: # 处理每个数据块 processed = await self._process_chunk(chunk) # 更新流状态 if processed.get("type") == "done": self.active_streams[stream_id]["status"] = "completed" yield processed except Exception as e: self.active_streams[stream_id]["status"] = "error" self.active_streams[stream_id]["error"] = str(e) yield { "type": "error", "error": str(e) } finally: # 清理流 if stream_id in self.active_streams: del self.active_streams[stream_id] async def _process_chunk( self, chunk: Dict[str, Any] ) -> Dict[str, Any]: """处理单个数据块""" chunk_type = chunk.get("type") if chunk_type == "data": # 处理数据块 return { "type": "data", "data": chunk.get("batch"), "progress": chunk.get("progress"), "timestamp": asyncio.get_event_loop().time() } elif chunk_type == "done": # 处理完成信号 return { "type": "done", "total": chunk.get("total"), "timestamp": asyncio.get_event_loop().time() } return chunk def get_stream_status(self, stream_id: str) -> Dict[str, Any]: """获取流状态""" return self.active_streams.get(stream_id, { "status": "not_found" })3. 流式数据聚合器class StreamAggregator: def __init__(self): self.buffers = {} self.aggregators = {} async def aggregate_stream( self, stream_id: str, stream: AsyncIterator[Dict[str, Any]], aggregation_func: callable ) -> Dict[str, Any]: """聚合流式数据""" buffer = [] self.buffers[stream_id] = buffer try: async for chunk in stream: if chunk.get("type") == "data": buffer.append(chunk.get("data")) elif chunk.get("type") == "done": # 执行聚合 result = aggregation_func(buffer) return { "type": "aggregated", "result": result, "count": len(buffer) } return { "type": "error", "error": "Stream ended without completion" } finally: if stream_id in self.buffers: del self.buffers[stream_id] def get_buffer(self, stream_id: str) -> list: """获取缓冲区数据""" return self.buffers.get(stream_id, [])4. 流式进度跟踪class StreamProgressTracker: def __init__(self): self.progress = {} def track_stream( self, stream_id: str, total_items: int ): """开始跟踪流进度""" self.progress[stream_id] = { "total": total_items, "processed": 0, "start_time": asyncio.get_event_loop().time(), "last_update": asyncio.get_event_loop().time() } def update_progress( self, stream_id: str, processed: int ): """更新进度""" if stream_id not in self.progress: return self.progress[stream_id]["processed"] = processed self.progress[stream_id]["last_update"] = \ asyncio.get_event_loop().time() def get_progress(self, stream_id: str) -> Dict[str, Any]: """获取进度信息""" if stream_id not in self.progress: return { "status": "not_found" } info = self.progress[stream_id] return { "total": info["total"], "processed": info["processed"], "percentage": (info["processed"] / info["total"]) * 100, "elapsed": asyncio.get_event_loop().time() - info["start_time"], "estimated_remaining": self._estimate_remaining(info) } def _estimate_remaining(self, info: Dict[str, Any]) -> float: """估算剩余时间""" if info["processed"] == 0: return 0.0 elapsed = asyncio.get_event_loop().time() - info["start_time"] rate = info["processed"] / elapsed if rate == 0: return 0.0 remaining = (info["total"] - info["processed"]) / rate return remaining5. 流式错误处理class StreamErrorHandler: def __init__(self): self.error_handlers = {} def register_handler( self, error_type: str, handler: callable ): """注册错误处理器""" self.error_handlers[error_type] = handler async def handle_error( self, error: Exception, stream_id: str ) -> Dict[str, Any]: """处理流错误""" error_type = type(error).__name__ # 查找对应的错误处理器 handler = self.error_handlers.get(error_type) if handler: try: result = await handler(error, stream_id) return { "type": "handled", "result": result } except Exception as e: return { "type": "error", "error": f"Error handler failed: {str(e)}" } # 默认错误处理 return { "type": "error", "error": str(error), "error_type": error_type }6. 流式资源管理class StreamResourceManager: def __init__(self): self.resources = {} def allocate_resource( self, stream_id: str, resource_type: str, resource: Any ): """分配流资源""" if stream_id not in self.resources: self.resources[stream_id] = {} self.resources[stream_id][resource_type] = resource def get_resource( self, stream_id: str, resource_type: str ) -> Any: """获取流资源""" if stream_id not in self.resources: return None return self.resources[stream_id].get(resource_type) def release_resources(self, stream_id: str): """释放流资源""" if stream_id not in self.resources: return resources = self.resources[stream_id] # 清理资源 for resource_type, resource in resources.items(): if hasattr(resource, 'close'): resource.close() elif hasattr(resource, '__aenter__'): asyncio.create_task(resource.__aexit__(None, None, None)) del self.resources[stream_id]7. 流式性能监控class StreamPerformanceMonitor: def __init__(self): self.metrics = {} def start_monitoring(self, stream_id: str): """开始监控""" self.metrics[stream_id] = { "start_time": asyncio.get_event_loop().time(), "chunks": 0, "bytes": 0, "errors": 0 } def record_chunk(self, stream_id: str, size: int): """记录数据块""" if stream_id not in self.metrics: return self.metrics[stream_id]["chunks"] += 1 self.metrics[stream_id]["bytes"] += size def record_error(self, stream_id: str): """记录错误""" if stream_id not in self.metrics: return self.metrics[stream_id]["errors"] += 1 def get_metrics(self, stream_id: str) -> Dict[str, Any]: """获取性能指标""" if stream_id not in self.metrics: return { "status": "not_found" } metrics = self.metrics[stream_id] elapsed = asyncio.get_event_loop().time() - metrics["start_time"] return { "elapsed": elapsed, "chunks": metrics["chunks"], "bytes": metrics["bytes"], "errors": metrics["errors"], "chunks_per_second": metrics["chunks"] / elapsed if elapsed > 0 else 0, "bytes_per_second": metrics["bytes"] / elapsed if elapsed > 0 else 0, "error_rate": metrics["errors"] / metrics["chunks"] if metrics["chunks"] > 0 else 0 }最佳实践:合理分块:根据网络条件和数据特性选择合适的块大小进度反馈:提供清晰的进度信息,改善用户体验错误恢复:实现错误恢复机制,提高系统鲁棒性资源管理:及时释放流资源,避免内存泄漏性能监控:监控流性能,及时发现和解决问题超时控制:设置合理的超时时间,防止无限等待通过完善的流式处理机制,可以高效处理大量数据和长时间运行的操作。
阅读 0·2月19日 21:34

MCP 的生态系统包含哪些组件和工具?

MCP 的生态系统正在快速发展,包含多个关键组件和工具:核心组件1. MCP SDKPython SDK:官方提供的 Python 实现,包含服务器和客户端库TypeScript/JavaScript SDK:用于 Node.js 和浏览器环境Go SDK:高性能的 Go 语言实现其他语言:社区维护的 Rust、Java、C# 等实现2. MCP 服务器文件系统服务器:提供文件读写、搜索等操作数据库服务器:支持多种数据库(PostgreSQL、MySQL、MongoDB 等)HTTP 服务器:通用的 HTTP API 调用工具Git 服务器:版本控制操作集成SSH 服务器:远程命令执行Slack 服务器:Slack 集成工具3. MCP 客户端Claude Desktop:原生支持 MCP 的桌面应用VS Code 扩展:在编辑器中使用 MCP 工具命令行工具:用于测试和调试 MCP 服务器Web 客户端:浏览器中的 MCP 集成开发工具4. 测试框架MCP Inspector:用于测试和调试 MCP 服务器的工具Mock Server:模拟 MCP 服务器用于单元测试性能测试工具:基准测试和负载测试5. 文档和资源官方文档:完整的协议规范和实现指南示例代码:各种使用场景的示例教程和指南:从入门到高级的教程API 参考:详细的 API 文档社区项目6. 第三方服务器GitHub:开源的 MCP 服务器集合NPM/PyPI:包管理器中的 MCP 相关包社区贡献:由开发者贡献的各种工具7. 集成框架LangChain MCP:LangChain 框架的 MCP 集成LlamaIndex MCP:LlamaIndex 的 MCP 支持AutoGPT MCP:AutoGPT 的 MCP 适配器部署和运维8. 部署工具Docker 镜像:预配置的 MCP 服务器容器Helm Charts:Kubernetes 部署配置Terraform 模块:基础设施即代码9. 监控和日志Prometheus 集成:指标收集和监控Grafana 仪表板:可视化监控ELK Stack:日志聚合和分析学习资源10. 教育资源官方教程:Anthropic 提供的入门教程视频课程:YouTube、Udemy 等平台的课程博客文章:社区分享的技术文章会议演讲:技术会议中的 MCP 相关演讲发展趋势11. 未来方向更多语言支持:扩展到更多编程语言增强安全性:更强大的安全机制性能优化:更高效的协议实现标准化推进:推动成为行业标准如何参与生态系统贡献代码:在 GitHub 上提交 PR编写文档:改进文档和教程分享经验:撰写博客和教程报告问题:提交 Bug 和功能请求参与讨论:加入社区讨论和交流MCP 生态系统的丰富性使其能够满足各种应用场景的需求,也为开发者提供了广阔的参与和贡献空间。
阅读 0·2月19日 21:34

如何在 MCP 中定义工具和验证参数?

MCP 的工具定义和参数验证是确保系统稳定性和可用性的关键部分。以下是详细的实现方法:工具定义结构每个 MCP 工具都需要定义以下属性:{ "name": "tool_name", "description": "工具的详细描述", "inputSchema": { "type": "object", "properties": { "param1": { "type": "string", "description": "参数1的描述" }, "param2": { "type": "number", "description": "参数2的描述", "minimum": 0, "maximum": 100 } }, "required": ["param1"] }}1. 参数类型定义MCP 支持以下参数类型:string:字符串类型number:数字类型(整数或浮点数)integer:整数类型boolean:布尔类型array:数组类型object:对象类型null:空值{ "name": "search_database", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "搜索查询字符串", "minLength": 1, "maxLength": 500 }, "limit": { "type": "integer", "description": "返回结果数量限制", "minimum": 1, "maximum": 100, "default": 10 }, "filters": { "type": "object", "description": "过滤条件", "properties": { "category": {"type": "string"}, "date_range": { "type": "object", "properties": { "start": {"type": "string", "format": "date"}, "end": {"type": "string", "format": "date"} } } } }, "sort_by": { "type": "string", "enum": ["relevance", "date", "popularity"], "description": "排序方式" } }, "required": ["query"] }}2. 参数验证实现from typing import Any, Dict, Listimport reclass ParameterValidator: def __init__(self, schema: Dict[str, Any]): self.schema = schema def validate(self, params: Dict[str, Any]) -> tuple[bool, str]: """验证参数""" # 检查必需参数 required = self.schema.get("required", []) for param in required: if param not in params: return False, f"缺少必需参数: {param}" # 验证每个参数 properties = self.schema.get("properties", {}) for param_name, param_value in params.items(): if param_name not in properties: return False, f"未知参数: {param_name}" param_schema = properties[param_name] is_valid, error = self._validate_param( param_value, param_schema ) if not is_valid: return False, f"参数 '{param_name}' 验证失败: {error}" return True, "" def _validate_param(self, value: Any, schema: Dict[str, Any]) -> tuple[bool, str]: """验证单个参数""" param_type = schema.get("type") # 类型验证 if param_type == "string": if not isinstance(value, str): return False, "期望字符串类型" # 字符串长度验证 if "minLength" in schema and len(value) < schema["minLength"]: return False, f"字符串长度不能少于 {schema['minLength']}" if "maxLength" in schema and len(value) > schema["maxLength"]: return False, f"字符串长度不能超过 {schema['maxLength']}" # 正则表达式验证 if "pattern" in schema: if not re.match(schema["pattern"], value): return False, "格式不匹配" elif param_type == "number": if not isinstance(value, (int, float)): return False, "期望数字类型" if "minimum" in schema and value < schema["minimum"]: return False, f"数值不能小于 {schema['minimum']}" if "maximum" in schema and value > schema["maximum"]: return False, f"数值不能大于 {schema['maximum']}" elif param_type == "integer": if not isinstance(value, int): return False, "期望整数类型" elif param_type == "boolean": if not isinstance(value, bool): return False, "期望布尔类型" elif param_type == "array": if not isinstance(value, list): return False, "期望数组类型" if "minItems" in schema and len(value) < schema["minItems"]: return False, f"数组长度不能少于 {schema['minItems']}" if "maxItems" in schema and len(value) > schema["maxItems"]: return False, f"数组长度不能超过 {schema['maxItems']}" elif param_type == "object": if not isinstance(value, dict): return False, "期望对象类型" # 枚举值验证 if "enum" in schema and value not in schema["enum"]: return False, f"值必须是以下之一: {schema['enum']}" return True, ""3. 工具描述最佳实践# 好的工具描述示例{ "name": "execute_sql_query", "description": """在数据库中执行 SQL 查询并返回结果。 此工具支持 SELECT 查询,可以用于检索、聚合和分析数据。 查询结果将以表格形式返回,包含列名和数据行。 注意事项: - 只支持 SELECT 查询,不支持 INSERT、UPDATE、DELETE - 查询超时时间为 30 秒 - 返回结果最多 1000 行 """, "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "SQL 查询语句(仅支持 SELECT)", "minLength": 1 }, "database": { "type": "string", "description": "数据库名称", "enum": ["production", "staging", "analytics"] } }, "required": ["query"] }}4. 高级参数验证class AdvancedValidator(ParameterValidator): def _validate_param(self, value: Any, schema: Dict[str, Any]) -> tuple[bool, str]: # 调用父类验证 is_valid, error = super()._validate_param(value, schema) if not is_valid: return False, error # 自定义验证 if "format" in schema: is_valid, error = self._validate_format(value, schema["format"]) if not is_valid: return False, error if "customValidator" in schema: is_valid, error = schema["customValidator"](value) if not is_valid: return False, error return True, "" def _validate_format(self, value: Any, format_type: str) -> tuple[bool, str]: """验证格式""" if format_type == "email": if not re.match(r'^[^@]+@[^@]+\.[^@]+$', value): return False, "无效的邮箱格式" elif format_type == "uri": if not re.match(r'^https?://', value): return False, "无效的 URI 格式" elif format_type == "date": try: from datetime import datetime datetime.strptime(value, "%Y-%m-%d") except ValueError: return False, "无效的日期格式 (YYYY-MM-DD)" return True, ""5. 错误处理def handle_tool_call(tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]: """处理工具调用""" try: # 获取工具定义 tool = get_tool_definition(tool_name) # 验证参数 validator = ParameterValidator(tool["inputSchema"]) is_valid, error = validator.validate(params) if not is_valid: return { "success": False, "error": f"参数验证失败: {error}", "error_code": "INVALID_PARAMS" } # 执行工具 result = execute_tool(tool_name, params) return { "success": True, "result": result } except Exception as e: return { "success": False, "error": f"工具执行失败: {str(e)}", "error_code": "EXECUTION_ERROR" }通过完善的工具定义和参数验证机制,可以确保 MCP 系统的稳定性和可靠性。
阅读 0·2月19日 21:34

在 MCP 中如何实现错误处理和重试机制?

在 MCP 中实现错误处理和重试机制是确保系统稳定性和可靠性的关键。以下是详细的实现策略:错误处理策略1. 错误分类可重试错误:网络超时、临时服务不可用、速率限制等不可重试错误:参数错误、权限不足、资源不存在等业务错误:业务逻辑相关的错误,需要特殊处理2. 错误响应格式{ "jsonrpc": "2.0", "id": "req-123", "error": { "code": -32000, "message": "Server error", "data": { "retryable": true, "retryAfter": 5, "details": "数据库连接超时" } }}3. 错误处理实现from typing import Optionalimport asyncioclass MCPErrorHandler: def __init__(self): self.retryable_codes = [ -32000, # Server error -32001, # Timeout -32002 # Rate limit ] def is_retryable(self, error: dict) -> bool: """判断错误是否可重试""" error_code = error.get("code") return error_code in self.retryable_codes def get_retry_delay(self, error: dict) -> int: """获取重试延迟时间""" error_data = error.get("data", {}) return error_data.get("retryAfter", 1)重试机制4. 指数退避重试import timeimport randomasync def exponential_backoff_retry( func, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 32.0): """指数退避重试机制""" last_exception = None for attempt in range(max_retries): try: return await func() except Exception as e: last_exception = e if attempt == max_retries - 1: raise # 计算延迟时间(加入随机抖动) delay = min( base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay ) await asyncio.sleep(delay) raise last_exception5. 智能重试策略class RetryStrategy: def __init__( self, max_retries: int = 3, backoff_factor: float = 2.0, jitter: bool = True ): self.max_retries = max_retries self.backoff_factor = backoff_factor self.jitter = jitter async def execute_with_retry( self, func, is_retryable: Optional[callable] = None ): """使用智能重试策略执行函数""" for attempt in range(self.max_retries): try: return await func() except Exception as e: if attempt == self.max_retries - 1: raise if is_retryable and not is_retryable(e): raise delay = self._calculate_delay(attempt) await asyncio.sleep(delay) def _calculate_delay(self, attempt: int) -> float: """计算重试延迟""" delay = self.backoff_factor ** attempt if self.jitter: delay += random.uniform(0, delay * 0.1) return delay断路器模式6. 实现断路器from enum import Enumimport timeclass CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open"class CircuitBreaker: def __init__( self, failure_threshold: int = 5, recovery_timeout: float = 60.0 ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = None async def call(self, func): """通过断路器调用函数""" if self.state == CircuitState.OPEN: if self._should_attempt_reset(): self.state = CircuitState.HALF_OPEN else: raise Exception("Circuit breaker is OPEN") try: result = await func() self._on_success() return result except Exception as e: self._on_failure() raise def _should_attempt_reset(self) -> bool: """判断是否应该尝试重置断路器""" if self.last_failure_time is None: return False elapsed = time.time() - self.last_failure_time return elapsed >= self.recovery_timeout def _on_success(self): """成功时的处理""" self.failure_count = 0 if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.CLOSED def _on_failure(self): """失败时的处理""" self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN监控和日志7. 错误监控class ErrorMonitor: def __init__(self): self.error_counts = {} self.error_rates = {} def record_error(self, error_type: str): """记录错误""" self.error_counts[error_type] = \ self.error_counts.get(error_type, 0) + 1 def get_error_rate(self, error_type: str) -> float: """获取错误率""" total = sum(self.error_counts.values()) if total == 0: return 0.0 return self.error_counts.get(error_type, 0) / total最佳实践区分错误类型:正确识别可重试和不可重试错误合理设置重试参数:根据业务场景调整重试次数和延迟实现断路器:防止级联失败详细日志记录:记录所有错误和重试信息监控和告警:实时监控错误率并设置告警优雅降级:在服务不可用时提供降级方案通过这些策略,可以构建一个健壮的 MCP 系统,有效处理各种错误情况。
阅读 0·2月19日 21:34