乐闻世界logo
搜索文章和话题

面试题手册

如何部署和运维 MCP 系统?有哪些最佳实践?

MCP 的部署和运维对于生产环境的稳定运行至关重要。以下是详细的部署策略和运维最佳实践:部署架构MCP 可以采用多种部署架构:单机部署:适合开发和测试环境容器化部署:使用 Docker 容器Kubernetes 部署:适合大规模生产环境无服务器部署:使用 AWS Lambda、Azure Functions 等1. Docker 容器化部署# DockerfileFROM python:3.11-slimWORKDIR /app# 安装依赖COPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt# 复制应用代码COPY . .# 暴露端口EXPOSE 8000# 健康检查HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1# 启动应用CMD ["python", "-m", "mcp.server", "--host", "0.0.0.0", "--port", "8000"]# docker-compose.ymlversion: '3.8'services: mcp-server: build: . ports: - "8000:8000" environment: - MCP_HOST=0.0.0.0 - MCP_PORT=8000 - LOG_LEVEL=info - DATABASE_URL=postgresql://user:pass@db:5432/mcp volumes: - ./config:/app/config - ./logs:/app/logs depends_on: - db - redis restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s timeout: 10s retries: 3 db: image: postgres:15 environment: - POSTGRES_DB=mcp - POSTGRES_USER=user - POSTGRES_PASSWORD=pass volumes: - postgres_data:/var/lib/postgresql/data restart: unless-stopped redis: image: redis:7-alpine volumes: - redis_data:/data restart: unless-stoppedvolumes: postgres_data: redis_data:2. Kubernetes 部署# deployment.yamlapiVersion: apps/v1kind: Deploymentmetadata: name: mcp-server labels: app: mcp-serverspec: replicas: 3 selector: matchLabels: app: mcp-server template: metadata: labels: app: mcp-server spec: containers: - name: mcp-server image: your-registry/mcp-server:latest ports: - containerPort: 8000 env: - name: MCP_HOST value: "0.0.0.0" - name: MCP_PORT value: "8000" - name: DATABASE_URL valueFrom: secretKeyRef: name: mcp-secrets key: database-url resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8000 initialDelaySeconds: 5 periodSeconds: 5---apiVersion: v1kind: Servicemetadata: name: mcp-serverspec: selector: app: mcp-server ports: - protocol: TCP port: 80 targetPort: 8000 type: LoadBalancer---apiVersion: autoscaling/v2kind: HorizontalPodAutoscalermetadata: name: mcp-server-hpaspec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: mcp-server minReplicas: 3 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 803. CI/CD 流水线# .github/workflows/deploy.ymlname: Deploy MCP Serveron: push: branches: [main] pull_request: branches: [main]jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11' - name: Install dependencies run: | pip install -r requirements.txt pip install pytest pytest-cov - name: Run tests run: | pytest --cov=mcp --cov-report=xml - name: Upload coverage uses: codecov/codecov-action@v3 build: needs: test runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Build Docker image run: | docker build -t mcp-server:${{ github.sha }} . - name: Push to registry run: | echo ${{ secrets.DOCKER_PASSWORD }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin docker tag mcp-server:${{ github.sha }} your-registry/mcp-server:latest docker push your-registry/mcp-server:latest deploy: needs: build runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - name: Deploy to Kubernetes uses: azure/k8s-deploy@v4 with: manifests: | k8s/deployment.yaml images: | your-registry/mcp-server:latest kubeconfig: ${{ secrets.KUBE_CONFIG }}4. 监控和日志# monitoring.pyfrom prometheus_client import Counter, Histogram, Gauge, start_http_serverimport loggingfrom logging.handlers import RotatingFileHandler# Prometheus 指标REQUEST_COUNT = Counter('mcp_requests_total', 'Total requests', ['method', 'endpoint'])REQUEST_DURATION = Histogram('mcp_request_duration_seconds', 'Request duration')ACTIVE_CONNECTIONS = Gauge('mcp_active_connections', 'Active connections')ERROR_COUNT = Counter('mcp_errors_total', 'Total errors', ['error_type'])# 日志配置def setup_logging(): logger = logging.getLogger('mcp') logger.setLevel(logging.INFO) # 文件处理器 file_handler = RotatingFileHandler( 'logs/mcp.log', maxBytes=10*1024*1024, # 10MB backupCount=5 ) file_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter( logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') ) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger# 启动监控服务器def start_metrics_server(port: int = 9090): start_http_server(port) logging.info(f"Metrics server started on port {port}")5. 配置管理# config.pyimport osfrom pydantic import BaseSettings, Fieldclass MCPSettings(BaseSettings): # 服务器配置 host: str = Field(default="0.0.0.0", env="MCP_HOST") port: int = Field(default=8000, env="MCP_PORT") # 数据库配置 database_url: str = Field(..., env="DATABASE_URL") database_pool_size: int = Field(default=10, env="DATABASE_POOL_SIZE") # Redis 配置 redis_url: str = Field(default="redis://localhost:6379", env="REDIS_URL") # 日志配置 log_level: str = Field(default="INFO", env="LOG_LEVEL") log_file: str = Field(default="logs/mcp.log", env="LOG_FILE") # 安全配置 secret_key: str = Field(..., env="SECRET_KEY") jwt_algorithm: str = Field(default="HS256", env="JWT_ALGORITHM") # 性能配置 max_connections: int = Field(default=100, env="MAX_CONNECTIONS") request_timeout: int = Field(default=30, env="REQUEST_TIMEOUT") # 缓存配置 cache_ttl: int = Field(default=3600, env="CACHE_TTL") class Config: env_file = ".env" case_sensitive = False# 加载配置settings = MCPSettings()6. 备份和恢复#!/bin/bash# backup.sh# 数据库备份backup_database() { echo "Backing up database..." pg_dump $DATABASE_URL > backups/db_$(date +%Y%m%d_%H%M%S).sql echo "Database backup completed"}# 配置备份backup_config() { echo "Backing up configuration..." tar -czf backups/config_$(date +%Y%m%d_%H%M%S).tar.gz config/ echo "Configuration backup completed"}# 日志备份backup_logs() { echo "Backing up logs..." tar -czf backups/logs_$(date +%Y%m%d_%H%M%S).tar.gz logs/ echo "Logs backup completed"}# 清理旧备份cleanup_old_backups() { echo "Cleaning up old backups (older than 7 days)..." find backups/ -name "*.sql" -mtime +7 -delete find backups/ -name "*.tar.gz" -mtime +7 -delete echo "Cleanup completed"}# 主函数main() { mkdir -p backups backup_database backup_config backup_logs cleanup_old_backups echo "All backups completed successfully"}main7. 故障排查# diagnostics.pyimport psutilimport asynciofrom typing import Dict, Anyclass SystemDiagnostics: @staticmethod def get_system_info() -> Dict[str, Any]: """获取系统信息""" return { "cpu_percent": psutil.cpu_percent(interval=1), "memory": { "total": psutil.virtual_memory().total, "available": psutil.virtual_memory().available, "percent": psutil.virtual_memory().percent }, "disk": { "total": psutil.disk_usage('/').total, "used": psutil.disk_usage('/').used, "percent": psutil.disk_usage('/').percent }, "network": { "connections": len(psutil.net_connections()), "io_counters": psutil.net_io_counters()._asdict() } } @staticmethod async def check_database_connection(db_url: str) -> bool: """检查数据库连接""" try: # 实现数据库连接检查 return True except Exception as e: logging.error(f"Database connection failed: {e}") return False @staticmethod async def check_redis_connection(redis_url: str) -> bool: """检查 Redis 连接""" try: # 实现 Redis 连接检查 return True except Exception as e: logging.error(f"Redis connection failed: {e}") return False @staticmethod def get_service_status() -> Dict[str, bool]: """获取服务状态""" return { "database": asyncio.run(SystemDiagnostics.check_database_connection(settings.database_url)), "redis": asyncio.run(SystemDiagnostics.check_redis_connection(settings.redis_url)), "api": True # 如果能运行到这里,API 服务是正常的 }最佳实践:容器化:使用 Docker 容器确保环境一致性自动化部署:使用 CI/CD 自动化部署流程监控告警:实施全面的监控和告警机制日志集中:集中管理日志,便于分析和排查备份策略:定期备份重要数据和配置灾难恢复:制定并测试灾难恢复计划安全加固:实施安全加固措施性能优化:持续监控和优化系统性能通过完善的部署和运维策略,可以确保 MCP 系统在生产环境中的稳定运行。
阅读 0·2月19日 21:35

MCP 中的资源管理是如何工作的?

MCP 的资源管理机制允许 LLM 访问和操作外部资源,如文件、数据库记录、API 端点等。以下是详细的实现方法:资源定义MCP 资源通过 URI(统一资源标识符)进行标识和访问:{ "uri": "file:///path/to/resource", "name": "资源名称", "description": "资源描述", "mimeType": "text/plain"}1. 资源类型MCP 支持多种资源类型:文件资源:file:///path/to/fileHTTP 资源:http://example.com/api/resource数据库资源:db://database/table/id自定义资源:custom://resource-type/id2. 资源注册from mcp.server import Serverfrom mcp.types import Resourceserver = Server("my-mcp-server")@server.resource( uri="file:///config/app.json", name="应用配置", description="应用程序的配置文件", mimeType="application/json")async def get_app_config() -> str: """获取应用配置""" return """ { "name": "MyApp", "version": "1.0.0", "settings": { "debug": false, "maxConnections": 100 } } """@server.resource( uri="db://users/{id}", name="用户信息", description="用户详细信息", mimeType="application/json")async def get_user(id: str) -> str: """获取用户信息""" user = await database.get_user(id) return json.dumps(user)3. 资源访问控制class ResourceAccessControl: def __init__(self): self.permissions = {} self.acl = {} def grant_permission(self, user: str, resource_pattern: str, access: str): """授予资源访问权限""" if user not in self.permissions: self.permissions[user] = [] self.permissions[user].append({ "pattern": resource_pattern, "access": access # "read", "write", "delete" }) def check_permission(self, user: str, resource_uri: str, access: str) -> bool: """检查访问权限""" if user not in self.permissions: return False for perm in self.permissions[user]: if self._match_pattern(perm["pattern"], resource_uri): if access in perm["access"] or perm["access"] == "all": return True return False def _match_pattern(self, pattern: str, uri: str) -> bool: """匹配资源模式""" import re # 将通配符转换为正则表达式 regex = pattern.replace("*", ".*").replace("?", ".") return re.match(regex, uri) is not None4. 资源缓存from functools import lru_cachefrom datetime import datetime, timedeltaimport hashlibclass ResourceCache: def __init__(self, ttl: int = 3600): self.cache = {} self.ttl = ttl def get(self, resource_uri: str) -> Optional[str]: """获取缓存资源""" if resource_uri not in self.cache: return None entry = self.cache[resource_uri] # 检查是否过期 if datetime.now() > entry["expires"]: del self.cache[resource_uri] return None return entry["data"] def set(self, resource_uri: str, data: str): """设置缓存资源""" self.cache[resource_uri] = { "data": data, "expires": datetime.now() + timedelta(seconds=self.ttl), "hash": hashlib.md5(data.encode()).hexdigest() } def invalidate(self, resource_uri: str): """使缓存失效""" if resource_uri in self.cache: del self.cache[resource_uri] def clear(self): """清空缓存""" self.cache.clear()5. 资源版本控制class ResourceVersionManager: def __init__(self): self.versions = {} def save_version(self, resource_uri: str, data: str, version: str): """保存资源版本""" if resource_uri not in self.versions: self.versions[resource_uri] = {} self.versions[resource_uri][version] = { "data": data, "timestamp": datetime.now().isoformat() } def get_version(self, resource_uri: str, version: str) -> Optional[str]: """获取指定版本""" if resource_uri not in self.versions: return None return self.versions[resource_uri].get(version, {}).get("data") def list_versions(self, resource_uri: str) -> List[str]: """列出所有版本""" if resource_uri not in self.versions: return [] return list(self.versions[resource_uri].keys()) def get_latest_version(self, resource_uri: str) -> Optional[str]: """获取最新版本""" versions = self.list_versions(resource_uri) if not versions: return None return max(versions)6. 资源监控class ResourceMonitor: def __init__(self): self.access_log = [] self.metrics = { "total_access": 0, "unique_resources": set(), "access_by_type": {} } def log_access(self, resource_uri: str, user: str, action: str): """记录资源访问""" log_entry = { "timestamp": datetime.now().isoformat(), "resource": resource_uri, "user": user, "action": action } self.access_log.append(log_entry) # 更新指标 self.metrics["total_access"] += 1 self.metrics["unique_resources"].add(resource_uri) # 按类型统计 resource_type = resource_uri.split("://")[0] if resource_type not in self.metrics["access_by_type"]: self.metrics["access_by_type"][resource_type] = 0 self.metrics["access_by_type"][resource_type] += 1 def get_metrics(self) -> dict: """获取监控指标""" return { "total_access": self.metrics["total_access"], "unique_resources": len(self.metrics["unique_resources"]), "access_by_type": self.metrics["access_by_type"] } def get_access_history(self, resource_uri: str, limit: int = 100) -> List[dict]: """获取访问历史""" filtered = [ log for log in self.access_log if log["resource"] == resource_uri ] return filtered[-limit:]7. 资源生命周期管理class ResourceLifecycleManager: def __init__(self): self.resources = {} self.cleanup_interval = 3600 # 1小时 def register_resource(self, resource_uri: str, metadata: dict): """注册资源""" self.resources[resource_uri] = { "metadata": metadata, "created_at": datetime.now(), "last_accessed": datetime.now(), "access_count": 0 } def access_resource(self, resource_uri: str): """访问资源""" if resource_uri in self.resources: self.resources[resource_uri]["last_accessed"] = datetime.now() self.resources[resource_uri]["access_count"] += 1 def cleanup_old_resources(self, max_age_days: int = 30): """清理旧资源""" cutoff = datetime.now() - timedelta(days=max_age_days) to_remove = [] for uri, info in self.resources.items(): if info["last_accessed"] < cutoff: to_remove.append(uri) for uri in to_remove: del self.resources[uri] return len(to_remove) def get_resource_stats(self) -> dict: """获取资源统计""" return { "total_resources": len(self.resources), "total_accesses": sum(r["access_count"] for r in self.resources.values()), "oldest_resource": min( (r["created_at"] for r in self.resources.values()), default=None ) }最佳实践:URI 设计:使用清晰、层次化的 URI 结构权限控制:实施最小权限原则缓存策略:根据资源特性设置合适的 TTL监控和日志:记录所有资源访问操作版本管理:对重要资源实施版本控制定期清理:清理不再使用的资源通过完善的资源管理机制,可以确保 MCP 系统中资源的安全、高效访问。
阅读 0·2月19日 21:35

如何在 MCP 中实现流式处理?

MCP 的流式处理能力允许实时传输大量数据或长时间运行的操作结果。以下是详细的实现方法:流式处理基础MCP 支持两种流式处理模式:服务器推送流:服务器主动推送数据客户端请求流:客户端请求流式响应1. 流式工具定义from mcp.server import Serverfrom mcp.types import Toolimport asyncioserver = Server("my-mcp-server")@server.tool( name="stream_data", description="流式返回大量数据")async def stream_data( count: int, batch_size: int = 10) -> AsyncIterator[str]: """流式生成数据""" for i in range(0, count, batch_size): batch = list(range(i, min(i + batch_size, count))) # 返回一批数据 yield { "type": "data", "batch": batch, "progress": (i + batch_size) / count } # 模拟处理延迟 await asyncio.sleep(0.1) # 发送完成信号 yield { "type": "done", "total": count }2. 流式响应处理器from typing import AsyncIterator, Dict, Anyimport jsonclass StreamProcessor: def __init__(self): self.active_streams = {} async def process_stream( self, stream_id: str, stream: AsyncIterator[Dict[str, Any]] ) -> AsyncIterator[Dict[str, Any]]: """处理流式响应""" self.active_streams[stream_id] = { "status": "active", "start_time": asyncio.get_event_loop().time() } try: async for chunk in stream: # 处理每个数据块 processed = await self._process_chunk(chunk) # 更新流状态 if processed.get("type") == "done": self.active_streams[stream_id]["status"] = "completed" yield processed except Exception as e: self.active_streams[stream_id]["status"] = "error" self.active_streams[stream_id]["error"] = str(e) yield { "type": "error", "error": str(e) } finally: # 清理流 if stream_id in self.active_streams: del self.active_streams[stream_id] async def _process_chunk( self, chunk: Dict[str, Any] ) -> Dict[str, Any]: """处理单个数据块""" chunk_type = chunk.get("type") if chunk_type == "data": # 处理数据块 return { "type": "data", "data": chunk.get("batch"), "progress": chunk.get("progress"), "timestamp": asyncio.get_event_loop().time() } elif chunk_type == "done": # 处理完成信号 return { "type": "done", "total": chunk.get("total"), "timestamp": asyncio.get_event_loop().time() } return chunk def get_stream_status(self, stream_id: str) -> Dict[str, Any]: """获取流状态""" return self.active_streams.get(stream_id, { "status": "not_found" })3. 流式数据聚合器class StreamAggregator: def __init__(self): self.buffers = {} self.aggregators = {} async def aggregate_stream( self, stream_id: str, stream: AsyncIterator[Dict[str, Any]], aggregation_func: callable ) -> Dict[str, Any]: """聚合流式数据""" buffer = [] self.buffers[stream_id] = buffer try: async for chunk in stream: if chunk.get("type") == "data": buffer.append(chunk.get("data")) elif chunk.get("type") == "done": # 执行聚合 result = aggregation_func(buffer) return { "type": "aggregated", "result": result, "count": len(buffer) } return { "type": "error", "error": "Stream ended without completion" } finally: if stream_id in self.buffers: del self.buffers[stream_id] def get_buffer(self, stream_id: str) -> list: """获取缓冲区数据""" return self.buffers.get(stream_id, [])4. 流式进度跟踪class StreamProgressTracker: def __init__(self): self.progress = {} def track_stream( self, stream_id: str, total_items: int ): """开始跟踪流进度""" self.progress[stream_id] = { "total": total_items, "processed": 0, "start_time": asyncio.get_event_loop().time(), "last_update": asyncio.get_event_loop().time() } def update_progress( self, stream_id: str, processed: int ): """更新进度""" if stream_id not in self.progress: return self.progress[stream_id]["processed"] = processed self.progress[stream_id]["last_update"] = \ asyncio.get_event_loop().time() def get_progress(self, stream_id: str) -> Dict[str, Any]: """获取进度信息""" if stream_id not in self.progress: return { "status": "not_found" } info = self.progress[stream_id] return { "total": info["total"], "processed": info["processed"], "percentage": (info["processed"] / info["total"]) * 100, "elapsed": asyncio.get_event_loop().time() - info["start_time"], "estimated_remaining": self._estimate_remaining(info) } def _estimate_remaining(self, info: Dict[str, Any]) -> float: """估算剩余时间""" if info["processed"] == 0: return 0.0 elapsed = asyncio.get_event_loop().time() - info["start_time"] rate = info["processed"] / elapsed if rate == 0: return 0.0 remaining = (info["total"] - info["processed"]) / rate return remaining5. 流式错误处理class StreamErrorHandler: def __init__(self): self.error_handlers = {} def register_handler( self, error_type: str, handler: callable ): """注册错误处理器""" self.error_handlers[error_type] = handler async def handle_error( self, error: Exception, stream_id: str ) -> Dict[str, Any]: """处理流错误""" error_type = type(error).__name__ # 查找对应的错误处理器 handler = self.error_handlers.get(error_type) if handler: try: result = await handler(error, stream_id) return { "type": "handled", "result": result } except Exception as e: return { "type": "error", "error": f"Error handler failed: {str(e)}" } # 默认错误处理 return { "type": "error", "error": str(error), "error_type": error_type }6. 流式资源管理class StreamResourceManager: def __init__(self): self.resources = {} def allocate_resource( self, stream_id: str, resource_type: str, resource: Any ): """分配流资源""" if stream_id not in self.resources: self.resources[stream_id] = {} self.resources[stream_id][resource_type] = resource def get_resource( self, stream_id: str, resource_type: str ) -> Any: """获取流资源""" if stream_id not in self.resources: return None return self.resources[stream_id].get(resource_type) def release_resources(self, stream_id: str): """释放流资源""" if stream_id not in self.resources: return resources = self.resources[stream_id] # 清理资源 for resource_type, resource in resources.items(): if hasattr(resource, 'close'): resource.close() elif hasattr(resource, '__aenter__'): asyncio.create_task(resource.__aexit__(None, None, None)) del self.resources[stream_id]7. 流式性能监控class StreamPerformanceMonitor: def __init__(self): self.metrics = {} def start_monitoring(self, stream_id: str): """开始监控""" self.metrics[stream_id] = { "start_time": asyncio.get_event_loop().time(), "chunks": 0, "bytes": 0, "errors": 0 } def record_chunk(self, stream_id: str, size: int): """记录数据块""" if stream_id not in self.metrics: return self.metrics[stream_id]["chunks"] += 1 self.metrics[stream_id]["bytes"] += size def record_error(self, stream_id: str): """记录错误""" if stream_id not in self.metrics: return self.metrics[stream_id]["errors"] += 1 def get_metrics(self, stream_id: str) -> Dict[str, Any]: """获取性能指标""" if stream_id not in self.metrics: return { "status": "not_found" } metrics = self.metrics[stream_id] elapsed = asyncio.get_event_loop().time() - metrics["start_time"] return { "elapsed": elapsed, "chunks": metrics["chunks"], "bytes": metrics["bytes"], "errors": metrics["errors"], "chunks_per_second": metrics["chunks"] / elapsed if elapsed > 0 else 0, "bytes_per_second": metrics["bytes"] / elapsed if elapsed > 0 else 0, "error_rate": metrics["errors"] / metrics["chunks"] if metrics["chunks"] > 0 else 0 }最佳实践:合理分块:根据网络条件和数据特性选择合适的块大小进度反馈:提供清晰的进度信息,改善用户体验错误恢复:实现错误恢复机制,提高系统鲁棒性资源管理:及时释放流资源,避免内存泄漏性能监控:监控流性能,及时发现和解决问题超时控制:设置合理的超时时间,防止无限等待通过完善的流式处理机制,可以高效处理大量数据和长时间运行的操作。
阅读 0·2月19日 21:34

MCP 的生态系统包含哪些组件和工具?

MCP 的生态系统正在快速发展,包含多个关键组件和工具:核心组件1. MCP SDKPython SDK:官方提供的 Python 实现,包含服务器和客户端库TypeScript/JavaScript SDK:用于 Node.js 和浏览器环境Go SDK:高性能的 Go 语言实现其他语言:社区维护的 Rust、Java、C# 等实现2. MCP 服务器文件系统服务器:提供文件读写、搜索等操作数据库服务器:支持多种数据库(PostgreSQL、MySQL、MongoDB 等)HTTP 服务器:通用的 HTTP API 调用工具Git 服务器:版本控制操作集成SSH 服务器:远程命令执行Slack 服务器:Slack 集成工具3. MCP 客户端Claude Desktop:原生支持 MCP 的桌面应用VS Code 扩展:在编辑器中使用 MCP 工具命令行工具:用于测试和调试 MCP 服务器Web 客户端:浏览器中的 MCP 集成开发工具4. 测试框架MCP Inspector:用于测试和调试 MCP 服务器的工具Mock Server:模拟 MCP 服务器用于单元测试性能测试工具:基准测试和负载测试5. 文档和资源官方文档:完整的协议规范和实现指南示例代码:各种使用场景的示例教程和指南:从入门到高级的教程API 参考:详细的 API 文档社区项目6. 第三方服务器GitHub:开源的 MCP 服务器集合NPM/PyPI:包管理器中的 MCP 相关包社区贡献:由开发者贡献的各种工具7. 集成框架LangChain MCP:LangChain 框架的 MCP 集成LlamaIndex MCP:LlamaIndex 的 MCP 支持AutoGPT MCP:AutoGPT 的 MCP 适配器部署和运维8. 部署工具Docker 镜像:预配置的 MCP 服务器容器Helm Charts:Kubernetes 部署配置Terraform 模块:基础设施即代码9. 监控和日志Prometheus 集成:指标收集和监控Grafana 仪表板:可视化监控ELK Stack:日志聚合和分析学习资源10. 教育资源官方教程:Anthropic 提供的入门教程视频课程:YouTube、Udemy 等平台的课程博客文章:社区分享的技术文章会议演讲:技术会议中的 MCP 相关演讲发展趋势11. 未来方向更多语言支持:扩展到更多编程语言增强安全性:更强大的安全机制性能优化:更高效的协议实现标准化推进:推动成为行业标准如何参与生态系统贡献代码:在 GitHub 上提交 PR编写文档:改进文档和教程分享经验:撰写博客和教程报告问题:提交 Bug 和功能请求参与讨论:加入社区讨论和交流MCP 生态系统的丰富性使其能够满足各种应用场景的需求,也为开发者提供了广阔的参与和贡献空间。
阅读 0·2月19日 21:34

如何在 MCP 中定义工具和验证参数?

MCP 的工具定义和参数验证是确保系统稳定性和可用性的关键部分。以下是详细的实现方法:工具定义结构每个 MCP 工具都需要定义以下属性:{ "name": "tool_name", "description": "工具的详细描述", "inputSchema": { "type": "object", "properties": { "param1": { "type": "string", "description": "参数1的描述" }, "param2": { "type": "number", "description": "参数2的描述", "minimum": 0, "maximum": 100 } }, "required": ["param1"] }}1. 参数类型定义MCP 支持以下参数类型:string:字符串类型number:数字类型(整数或浮点数)integer:整数类型boolean:布尔类型array:数组类型object:对象类型null:空值{ "name": "search_database", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "搜索查询字符串", "minLength": 1, "maxLength": 500 }, "limit": { "type": "integer", "description": "返回结果数量限制", "minimum": 1, "maximum": 100, "default": 10 }, "filters": { "type": "object", "description": "过滤条件", "properties": { "category": {"type": "string"}, "date_range": { "type": "object", "properties": { "start": {"type": "string", "format": "date"}, "end": {"type": "string", "format": "date"} } } } }, "sort_by": { "type": "string", "enum": ["relevance", "date", "popularity"], "description": "排序方式" } }, "required": ["query"] }}2. 参数验证实现from typing import Any, Dict, Listimport reclass ParameterValidator: def __init__(self, schema: Dict[str, Any]): self.schema = schema def validate(self, params: Dict[str, Any]) -> tuple[bool, str]: """验证参数""" # 检查必需参数 required = self.schema.get("required", []) for param in required: if param not in params: return False, f"缺少必需参数: {param}" # 验证每个参数 properties = self.schema.get("properties", {}) for param_name, param_value in params.items(): if param_name not in properties: return False, f"未知参数: {param_name}" param_schema = properties[param_name] is_valid, error = self._validate_param( param_value, param_schema ) if not is_valid: return False, f"参数 '{param_name}' 验证失败: {error}" return True, "" def _validate_param(self, value: Any, schema: Dict[str, Any]) -> tuple[bool, str]: """验证单个参数""" param_type = schema.get("type") # 类型验证 if param_type == "string": if not isinstance(value, str): return False, "期望字符串类型" # 字符串长度验证 if "minLength" in schema and len(value) < schema["minLength"]: return False, f"字符串长度不能少于 {schema['minLength']}" if "maxLength" in schema and len(value) > schema["maxLength"]: return False, f"字符串长度不能超过 {schema['maxLength']}" # 正则表达式验证 if "pattern" in schema: if not re.match(schema["pattern"], value): return False, "格式不匹配" elif param_type == "number": if not isinstance(value, (int, float)): return False, "期望数字类型" if "minimum" in schema and value < schema["minimum"]: return False, f"数值不能小于 {schema['minimum']}" if "maximum" in schema and value > schema["maximum"]: return False, f"数值不能大于 {schema['maximum']}" elif param_type == "integer": if not isinstance(value, int): return False, "期望整数类型" elif param_type == "boolean": if not isinstance(value, bool): return False, "期望布尔类型" elif param_type == "array": if not isinstance(value, list): return False, "期望数组类型" if "minItems" in schema and len(value) < schema["minItems"]: return False, f"数组长度不能少于 {schema['minItems']}" if "maxItems" in schema and len(value) > schema["maxItems"]: return False, f"数组长度不能超过 {schema['maxItems']}" elif param_type == "object": if not isinstance(value, dict): return False, "期望对象类型" # 枚举值验证 if "enum" in schema and value not in schema["enum"]: return False, f"值必须是以下之一: {schema['enum']}" return True, ""3. 工具描述最佳实践# 好的工具描述示例{ "name": "execute_sql_query", "description": """在数据库中执行 SQL 查询并返回结果。 此工具支持 SELECT 查询,可以用于检索、聚合和分析数据。 查询结果将以表格形式返回,包含列名和数据行。 注意事项: - 只支持 SELECT 查询,不支持 INSERT、UPDATE、DELETE - 查询超时时间为 30 秒 - 返回结果最多 1000 行 """, "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "SQL 查询语句(仅支持 SELECT)", "minLength": 1 }, "database": { "type": "string", "description": "数据库名称", "enum": ["production", "staging", "analytics"] } }, "required": ["query"] }}4. 高级参数验证class AdvancedValidator(ParameterValidator): def _validate_param(self, value: Any, schema: Dict[str, Any]) -> tuple[bool, str]: # 调用父类验证 is_valid, error = super()._validate_param(value, schema) if not is_valid: return False, error # 自定义验证 if "format" in schema: is_valid, error = self._validate_format(value, schema["format"]) if not is_valid: return False, error if "customValidator" in schema: is_valid, error = schema["customValidator"](value) if not is_valid: return False, error return True, "" def _validate_format(self, value: Any, format_type: str) -> tuple[bool, str]: """验证格式""" if format_type == "email": if not re.match(r'^[^@]+@[^@]+\.[^@]+$', value): return False, "无效的邮箱格式" elif format_type == "uri": if not re.match(r'^https?://', value): return False, "无效的 URI 格式" elif format_type == "date": try: from datetime import datetime datetime.strptime(value, "%Y-%m-%d") except ValueError: return False, "无效的日期格式 (YYYY-MM-DD)" return True, ""5. 错误处理def handle_tool_call(tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]: """处理工具调用""" try: # 获取工具定义 tool = get_tool_definition(tool_name) # 验证参数 validator = ParameterValidator(tool["inputSchema"]) is_valid, error = validator.validate(params) if not is_valid: return { "success": False, "error": f"参数验证失败: {error}", "error_code": "INVALID_PARAMS" } # 执行工具 result = execute_tool(tool_name, params) return { "success": True, "result": result } except Exception as e: return { "success": False, "error": f"工具执行失败: {str(e)}", "error_code": "EXECUTION_ERROR" }通过完善的工具定义和参数验证机制,可以确保 MCP 系统的稳定性和可靠性。
阅读 0·2月19日 21:34

在 MCP 中如何实现错误处理和重试机制?

在 MCP 中实现错误处理和重试机制是确保系统稳定性和可靠性的关键。以下是详细的实现策略:错误处理策略1. 错误分类可重试错误:网络超时、临时服务不可用、速率限制等不可重试错误:参数错误、权限不足、资源不存在等业务错误:业务逻辑相关的错误,需要特殊处理2. 错误响应格式{ "jsonrpc": "2.0", "id": "req-123", "error": { "code": -32000, "message": "Server error", "data": { "retryable": true, "retryAfter": 5, "details": "数据库连接超时" } }}3. 错误处理实现from typing import Optionalimport asyncioclass MCPErrorHandler: def __init__(self): self.retryable_codes = [ -32000, # Server error -32001, # Timeout -32002 # Rate limit ] def is_retryable(self, error: dict) -> bool: """判断错误是否可重试""" error_code = error.get("code") return error_code in self.retryable_codes def get_retry_delay(self, error: dict) -> int: """获取重试延迟时间""" error_data = error.get("data", {}) return error_data.get("retryAfter", 1)重试机制4. 指数退避重试import timeimport randomasync def exponential_backoff_retry( func, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 32.0): """指数退避重试机制""" last_exception = None for attempt in range(max_retries): try: return await func() except Exception as e: last_exception = e if attempt == max_retries - 1: raise # 计算延迟时间(加入随机抖动) delay = min( base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay ) await asyncio.sleep(delay) raise last_exception5. 智能重试策略class RetryStrategy: def __init__( self, max_retries: int = 3, backoff_factor: float = 2.0, jitter: bool = True ): self.max_retries = max_retries self.backoff_factor = backoff_factor self.jitter = jitter async def execute_with_retry( self, func, is_retryable: Optional[callable] = None ): """使用智能重试策略执行函数""" for attempt in range(self.max_retries): try: return await func() except Exception as e: if attempt == self.max_retries - 1: raise if is_retryable and not is_retryable(e): raise delay = self._calculate_delay(attempt) await asyncio.sleep(delay) def _calculate_delay(self, attempt: int) -> float: """计算重试延迟""" delay = self.backoff_factor ** attempt if self.jitter: delay += random.uniform(0, delay * 0.1) return delay断路器模式6. 实现断路器from enum import Enumimport timeclass CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open"class CircuitBreaker: def __init__( self, failure_threshold: int = 5, recovery_timeout: float = 60.0 ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = None async def call(self, func): """通过断路器调用函数""" if self.state == CircuitState.OPEN: if self._should_attempt_reset(): self.state = CircuitState.HALF_OPEN else: raise Exception("Circuit breaker is OPEN") try: result = await func() self._on_success() return result except Exception as e: self._on_failure() raise def _should_attempt_reset(self) -> bool: """判断是否应该尝试重置断路器""" if self.last_failure_time is None: return False elapsed = time.time() - self.last_failure_time return elapsed >= self.recovery_timeout def _on_success(self): """成功时的处理""" self.failure_count = 0 if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.CLOSED def _on_failure(self): """失败时的处理""" self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN监控和日志7. 错误监控class ErrorMonitor: def __init__(self): self.error_counts = {} self.error_rates = {} def record_error(self, error_type: str): """记录错误""" self.error_counts[error_type] = \ self.error_counts.get(error_type, 0) + 1 def get_error_rate(self, error_type: str) -> float: """获取错误率""" total = sum(self.error_counts.values()) if total == 0: return 0.0 return self.error_counts.get(error_type, 0) / total最佳实践区分错误类型:正确识别可重试和不可重试错误合理设置重试参数:根据业务场景调整重试次数和延迟实现断路器:防止级联失败详细日志记录:记录所有错误和重试信息监控和告警:实时监控错误率并设置告警优雅降级:在服务不可用时提供降级方案通过这些策略,可以构建一个健壮的 MCP 系统,有效处理各种错误情况。
阅读 0·2月19日 21:34

MCP 如何与微服务架构结合?

MCP 与微服务架构的结合可以构建更灵活、可扩展的 AI 应用系统。以下是详细的集成方法和架构设计:架构设计原则MCP 与微服务结合时应遵循以下原则:服务拆分:根据业务领域拆分 MCP 服务器独立部署:每个 MCP 服务器独立部署和扩展服务发现:使用服务发现机制动态定位服务API 网关:通过 API 网关统一管理 MCP 服务配置中心:集中管理所有 MCP 服务配置1. 微服务架构设计# 微服务架构示例"""架构层次:1. API Gateway (Kong/Nginx) - 请求路由 - 认证授权 - 限流熔断 - 负载均衡2. Service Mesh (Istio/Linkerd) - 服务间通信 - 流量管理 - 安全策略 - 可观测性3. MCP Services - Database MCP Server - File MCP Server - API MCP Server - Analytics MCP Server4. Infrastructure - Service Discovery (Consul/Eureka) - Configuration Center (Apollo/Consul) - Message Queue (Kafka/RabbitMQ) - Cache (Redis/Memcached)"""# 服务定义class MCPServiceRegistry: def __init__(self): self.services = {} def register_service( self, service_name: str, service_url: str, capabilities: list ): """注册 MCP 服务""" self.services[service_name] = { "url": service_url, "capabilities": capabilities, "status": "healthy", "registered_at": datetime.now() } def discover_service(self, capability: str) -> list: """发现具有特定能力的服务""" return [ { "name": name, "url": info["url"] } for name, info in self.services.items() if capability in info["capabilities"] ] def get_service_status(self, service_name: str) -> dict: """获取服务状态""" return self.services.get(service_name, { "status": "not_found" })2. 服务间通信from typing import List, Dict, Anyimport httpximport asyncioclass MCPServiceClient: def __init__(self, service_registry: MCPServiceRegistry): self.registry = service_registry self.http_client = httpx.AsyncClient(timeout=30.0) async def call_service( self, capability: str, tool_name: str, params: dict ) -> Dict[str, Any]: """调用 MCP 服务""" # 发现服务 services = self.registry.discover_service(capability) if not services: raise ValueError(f"未找到提供 {capability} 能力的服务") # 选择服务(负载均衡) service = self._select_service(services) # 调用服务 try: response = await self.http_client.post( f"{service['url']}/tools/call", json={ "name": tool_name, "arguments": params } ) return response.json() except Exception as e: # 服务调用失败,尝试其他服务 logging.error(f"服务调用失败: {e}") return await self._retry_with_fallback( services, service, tool_name, params ) def _select_service(self, services: List[dict]) -> dict: """选择服务(轮询)""" # 简单的轮询策略 import random return random.choice(services) async def _retry_with_fallback( self, services: List[dict], failed_service: dict, tool_name: str, params: dict ) -> Dict[str, Any]: """使用备用服务重试""" for service in services: if service == failed_service: continue try: response = await self.http_client.post( f"{service['url']}/tools/call", json={ "name": tool_name, "arguments": params } ) return response.json() except Exception as e: logging.error(f"备用服务调用失败: {e}") continue raise Exception("所有服务调用均失败")3. API 网关集成from fastapi import FastAPI, Request, HTTPExceptionfrom fastapi.middleware.cors import CORSMiddlewareapp = FastAPI(title="MCP API Gateway")# CORS 配置app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"],)# 服务路由@app.post("/api/v1/tools/{tool_name}")async def route_tool_call( tool_name: str, request: Request, client: MCPServiceClient = Depends(get_client)): """路由工具调用请求""" try: # 解析请求 params = await request.json() # 确定目标服务 capability = determine_capability(tool_name) # 调用服务 result = await client.call_service( capability=capability, tool_name=tool_name, params=params ) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e))@app.get("/api/v1/services")async def list_services( registry: MCPServiceRegistry = Depends(get_registry)): """列出所有可用服务""" return { "services": [ { "name": name, "capabilities": info["capabilities"], "status": info["status"] } for name, info in registry.services.items() ] }@app.get("/health")async def health_check(): """健康检查""" return {"status": "healthy"}4. 服务发现import consulclass ConsulServiceDiscovery: def __init__(self, consul_host: str = "localhost", consul_port: int = 8500): self.consul = consul.Consul(host=consul_host, port=consul_port) def register( self, service_name: str, service_id: str, address: str, port: int, tags: list = None ): """注册服务到 Consul""" self.consul.agent.service.register( name=service_name, service_id=service_id, address=address, port=port, tags=tags or [], check=consul.Check.http( f"http://{address}:{port}/health", interval="10s" ) ) def deregister(self, service_id: str): """从 Consul 注销服务""" self.consul.agent.service.deregister(service_id) def discover(self, service_name: str) -> list: """发现服务""" _, services = self.consul.health.service(service_name, passing=True) return [ { "id": service["Service"]["ID"], "address": service["Service"]["Address"], "port": service["Service"]["Port"], "tags": service["Service"]["Tags"] } for service in services ] def watch_service( self, service_name: str, callback: callable ): """监听服务变化""" index = None while True: index, services = self.consul.health.service( service_name, index=index, passing=True ) callback(services) # 等待变化 time.sleep(10)5. 配置中心集成from pydantic import BaseSettingsimport etcd3class EtcdConfigCenter: def __init__(self, etcd_host: str = "localhost", etcd_port: int = 2379): self.etcd = etcd3.client(host=etcd_host, port=etcd_port) def get_config(self, key: str) -> str: """获取配置""" value, _ = self.etcd.get(key) return value.decode() if value else None def set_config(self, key: str, value: str): """设置配置""" self.etcd.put(key, value) def watch_config(self, key: str, callback: callable): """监听配置变化""" events, cancel = self.etcd.watch(key) for event in events: callback(event.key.decode(), event.value.decode()) return cancel# MCP 服务配置class MCPServiceConfig(BaseSettings): service_name: str service_port: int database_url: str redis_url: str log_level: str = "INFO" @classmethod def from_config_center(cls, config_center: EtcdConfigCenter): """从配置中心加载配置""" return cls( service_name=config_center.get_config("/mcp/service/name"), service_port=int(config_center.get_config("/mcp/service/port")), database_url=config_center.get_config("/mcp/database/url"), redis_url=config_center.get_config("/mcp/redis/url"), log_level=config_center.get_config("/mcp/log/level", "INFO") )6. 消息队列集成import jsonfrom aiokafka import AIOKafkaProducer, AIOKafkaConsumerclass KafkaMCPIntegration: def __init__(self, bootstrap_servers: str): self.bootstrap_servers = bootstrap_servers self.producer = None self.consumer = None async def start_producer(self): """启动生产者""" self.producer = AIOKafkaProducer( bootstrap_servers=self.bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8') ) await self.producer.start() async def send_message( self, topic: str, message: dict ): """发送消息""" await self.producer.send_and_wait( topic, value=message ) async def start_consumer( self, topic: str, group_id: str, callback: callable ): """启动消费者""" self.consumer = AIOKafkaConsumer( topic, bootstrap_servers=self.bootstrap_servers, group_id=group_id, value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) await self.consumer.start() async for message in self.consumer: await callback(message.value) async def stop(self): """停止生产者和消费者""" if self.producer: await self.producer.stop() if self.consumer: await self.consumer.stop()7. 可观测性from opentelemetry import tracefrom opentelemetry.instrumentation.fastapi import FastAPIInstrumentorfrom opentelemetry.exporter.jaeger import JaegerExporterfrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessor# 配置追踪def setup_tracing(service_name: str): """设置分布式追踪""" trace.set_tracer_provider(TracerProvider()) jaeger_exporter = JaegerExporter( agent_host_name="localhost", agent_port=6831, ) span_processor = BatchSpanProcessor(jaeger_exporter) trace.get_tracer_provider().add_span_processor(span_processor) # 自动追踪 FastAPI FastAPIInstrumentor.instrument_app(app, tracer_provider=trace.get_tracer_provider())# 配置指标from prometheus_fastapi_instrumentator import Instrumentatordef setup_metrics(app: FastAPI): """设置指标收集""" instrumentator = Instrumentator() instrumentator.instrument(app).expose(app)最佳实践:服务拆分:根据业务能力和数据边界拆分服务独立部署:每个服务独立部署和扩展服务网格:使用服务网格管理服务间通信配置管理:集中管理配置,支持动态更新监控告警:实施全面的监控和告警机制容错设计:实现熔断、降级和重试机制安全加固:实施服务间认证和授权持续优化:持续监控和优化系统性能通过将 MCP 与微服务架构结合,可以构建更灵活、可扩展的 AI 应用系统。
阅读 0·2月19日 21:32

MCP 在实际项目中有哪些应用场景?

MCP 在实际项目中有多种应用场景,以下是一些典型用例:1. 数据库查询和操作场景:让 LLM 直接查询和分析数据库实现:创建 MCP 数据库服务器,提供 SQL 查询工具优势:无需编写复杂的查询逻辑,LLM 可以理解自然语言并生成 SQL示例:企业数据分析、报表生成、数据探索2. API 集成和服务调用场景:集成第三方服务(如支付、天气、地图等)实现:为每个服务创建 MCP 工具,定义清晰的参数和返回格式优势:统一的服务调用接口,易于维护和扩展示例:电商平台的订单处理、物流跟踪、支付集成3. 文件系统操作场景:让 LLM 读取、写入、搜索文件实现:创建文件系统 MCP 服务器,提供文件操作工具优势:安全的文件访问控制,支持多种文件格式示例:代码生成、文档处理、日志分析4. 实时数据监控场景:监控系统指标、日志、事件实现:创建监控 MCP 服务器,提供实时数据查询工具优势:实时数据访问,支持告警和通知示例:服务器监控、应用性能监控、安全事件检测5. 知识库和文档检索场景:从企业知识库中检索信息实现:创建知识库 MCP 服务器,集成向量搜索优势:语义搜索,上下文感知的答案生成示例:客户支持、内部知识查询、技术文档检索6. 自动化工作流场景:执行复杂的自动化任务实现:创建工作流 MCP 服务器,编排多个工具优势:灵活的任务编排,支持条件分支和循环示例:CI/CD 流水线、数据处理管道、批量操作7. IoT 设备控制场景:控制智能家居或工业设备实现:创建 IoT MCP 服务器,提供设备控制接口优势:统一的设备控制协议,支持多种设备类型示例:智能家居控制、工业自动化、环境监测8. 代码执行和测试场景:运行代码片段或执行测试实现:创建代码执行 MCP 服务器,提供安全的沙箱环境优势:隔离的执行环境,支持多种编程语言示例:代码验证、算法演示、在线编程教育实际案例:某电商平台使用 MCP 集成了订单管理、库存查询、支付处理等多个服务,LLM 可以直接处理客户查询、生成报表、执行订单操作,大大提高了客服效率和自动化水平。这些应用场景展示了 MCP 的强大和灵活性,使其成为连接 AI 模型与实际业务系统的理想选择。
阅读 0·2月19日 21:32

MCP 与传统函数调用机制有什么区别和优势?

MCP 与传统的函数调用机制相比,有以下几个关键区别和优势:1. 标准化程度传统函数调用:每个 AI 模型提供商都有自己的函数调用格式和规范MCP:提供统一的标准化协议,不同模型和工具可以使用相同的接口2. 集成复杂度传统函数调用:需要为每个模型-工具组合进行定制开发和维护MCP:一次开发,多处复用,大幅降低集成成本3. 工具发现机制传统函数调用:工具列表通常硬编码或需要手动配置MCP:支持动态工具发现和注册,自动获取可用工具列表4. 上下文管理传统函数调用:上下文管理较为简单,主要依赖模型本身MCP:内置上下文管理机制,支持更复杂的对话状态维护5. 扩展性传统函数调用:添加新工具需要修改代码和重新部署MCP:支持运行时动态添加和移除工具,无需重启服务6. 错误处理传统函数调用:错误处理机制各不相同,缺乏统一标准MCP:定义了统一的错误格式和处理流程7. 安全性传统函数调用:安全机制依赖各提供商实现MCP:内置安全层,提供标准化的认证和授权8. 互操作性传统函数调用:不同系统之间难以互操作MCP:设计时就考虑了跨平台、跨语言的互操作性实际应用示例:传统方式:为 GPT-4、Claude、Llama 等每个模型分别实现数据库查询工具MCP 方式:实现一个 MCP 数据库服务器,所有模型都可以直接使用这种标准化和简化的设计使 MCP 成为连接 AI 模型与外部系统的理想解决方案。
阅读 0·2月19日 21:32