乐闻世界logo
搜索文章和话题

MCP 的插件系统是如何工作的?

2月19日 21:39

MCP 的插件系统允许开发者扩展 MCP 服务器的功能,无需修改核心代码。以下是详细的插件架构和实现方法:

插件架构设计

MCP 插件系统应考虑以下方面:

  1. 插件发现:自动发现和加载插件
  2. 插件生命周期:管理插件的加载、初始化、卸载
  3. 插件隔离:确保插件之间相互隔离
  4. 插件通信:提供插件间的通信机制
  5. 插件安全:限制插件的权限和资源访问

1. 插件接口定义

python
from abc import ABC, abstractmethod from typing import Dict, Any, List class MCPPlugin(ABC): """MCP 插件基类""" def __init__(self, config: Dict[str, Any] = None): self.config = config or {} self.name = self.__class__.__name__ self.version = getattr(self, 'VERSION', '1.0.0') @abstractmethod async def initialize(self, server): """初始化插件""" pass @abstractmethod async def shutdown(self): """关闭插件""" pass @abstractmethod def get_tools(self) -> List[Dict[str, Any]]: """获取插件提供的工具""" return [] @abstractmethod def get_resources(self) -> List[Dict[str, Any]]: """获取插件提供的资源""" return [] @abstractmethod def get_prompts(self) -> List[Dict[str, Any]]: """获取插件提供的提示词""" return [] def get_metadata(self) -> Dict[str, Any]: """获取插件元数据""" return { "name": self.name, "version": self.version, "description": getattr(self, 'DESCRIPTION', ''), "author": getattr(self, 'AUTHOR', ''), "dependencies": getattr(self, 'DEPENDENCIES', []) }

2. 插件管理器

python
import importlib import inspect import os from pathlib import Path from typing import Dict, List, Type class PluginManager: def __init__(self, server): self.server = server self.plugins: Dict[str, MCPPlugin] = {} self.plugin_directories: List[str] = [] def add_plugin_directory(self, directory: str): """添加插件目录""" if directory not in self.plugin_directories: self.plugin_directories.append(directory) async def discover_plugins(self): """发现插件""" discovered_plugins = [] for directory in self.plugin_directories: plugin_path = Path(directory) if not plugin_path.exists(): continue # 遍历插件目录 for item in plugin_path.iterdir(): if item.is_file() and item.suffix == '.py': # 单文件插件 discovered_plugins.append(str(item)) elif item.is_dir() and (item / '__init__.py').exists(): # 包插件 discovered_plugins.append(str(item)) return discovered_plugins async def load_plugin(self, plugin_path: str) -> bool: """加载插件""" try: # 动态导入插件模块 spec = importlib.util.spec_from_file_location( "plugin_module", plugin_path ) if spec is None or spec.loader is None: return False module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # 查找插件类 plugin_class = None for name, obj in inspect.getmembers(module): if (inspect.isclass(obj) and issubclass(obj, MCPPlugin) and obj is not MCPPlugin): plugin_class = obj break if plugin_class is None: return False # 创建插件实例 plugin = plugin_class() # 初始化插件 await plugin.initialize(self.server) # 注册插件 self.plugins[plugin.name] = plugin # 注册插件提供的工具、资源、提示词 self._register_plugin_tools(plugin) self._register_plugin_resources(plugin) self._register_plugin_prompts(plugin) return True except Exception as e: print(f"加载插件失败 {plugin_path}: {e}") return False def _register_plugin_tools(self, plugin: MCPPlugin): """注册插件工具""" tools = plugin.get_tools() for tool_info in tools: @self.server.tool( name=tool_info["name"], description=tool_info["description"] ) async def tool_wrapper(**kwargs): return await tool_info["function"](**kwargs) def _register_plugin_resources(self, plugin: MCPPlugin): """注册插件资源""" resources = plugin.get_resources() for resource_info in resources: @self.server.resource( uri=resource_info["uri"], name=resource_info["name"], description=resource_info["description"] ) async def resource_wrapper(): return await resource_info["function"]() def _register_plugin_prompts(self, plugin: MCPPlugin): """注册插件提示词""" prompts = plugin.get_prompts() for prompt_info in prompts: @self.server.prompt( name=prompt_info["name"], description=prompt_info["description"] ) async def prompt_wrapper(**kwargs): return await prompt_info["function"](**kwargs) async def unload_plugin(self, plugin_name: str) -> bool: """卸载插件""" if plugin_name not in self.plugins: return False plugin = self.plugins[plugin_name] try: # 关闭插件 await plugin.shutdown() # 从插件列表中移除 del self.plugins[plugin_name] return True except Exception as e: print(f"卸载插件失败 {plugin_name}: {e}") return False async def reload_plugin(self, plugin_name: str) -> bool: """重新加载插件""" if plugin_name not in self.plugins: return False # 先卸载 await self.unload_plugin(plugin_name) # 重新加载(需要记录插件路径) # 这里简化处理,实际需要保存插件路径 return True def get_plugin_info(self, plugin_name: str) -> Dict[str, Any]: """获取插件信息""" if plugin_name not in self.plugins: return {} plugin = self.plugins[plugin_name] return plugin.get_metadata() def list_plugins(self) -> List[Dict[str, Any]]: """列出所有插件""" return [ plugin.get_metadata() for plugin in self.plugins.values() ]

3. 示例插件实现

python
# plugins/database_plugin.py class DatabasePlugin(MCPPlugin): """数据库插件""" VERSION = "1.0.0" DESCRIPTION = "提供数据库查询和管理功能" AUTHOR = "Your Name" DEPENDENCIES = ["sqlalchemy"] def __init__(self, config: Dict[str, Any] = None): super().__init__(config) self.db_connection = None async def initialize(self, server): """初始化数据库连接""" from sqlalchemy import create_engine db_url = self.config.get("database_url", "sqlite:///mcp.db") self.db_connection = create_engine(db_url) print(f"数据库插件 {self.name} 已初始化") async def shutdown(self): """关闭数据库连接""" if self.db_connection: self.db_connection.dispose() print(f"数据库插件 {self.name} 已关闭") def get_tools(self) -> List[Dict[str, Any]]: """获取数据库工具""" return [ { "name": "query_database", "description": "执行数据库查询", "function": self._query_database }, { "name": "execute_sql", "description": "执行 SQL 语句", "function": self._execute_sql } ] def get_resources(self) -> List[Dict[str, Any]]: """获取数据库资源""" return [ { "uri": "db://schema", "name": "数据库模式", "description": "数据库表结构", "function": self._get_schema } ] def get_prompts(self) -> List[Dict[str, Any]]: """获取数据库提示词""" return [ { "name": "generate_query", "description": "生成 SQL 查询提示词", "function": self._generate_query_prompt } ] async def _query_database(self, query: str) -> str: """执行数据库查询""" from sqlalchemy import text with self.db_connection.connect() as conn: result = conn.execute(text(query)) rows = result.fetchall() return f"查询结果: {len(rows)} 行" async def _execute_sql(self, sql: str) -> str: """执行 SQL 语句""" from sqlalchemy import text with self.db_connection.connect() as conn: conn.execute(text(sql)) conn.commit() return "SQL 执行成功" async def _get_schema(self) -> str: """获取数据库模式""" from sqlalchemy import inspect inspector = inspect(self.db_connection) tables = inspector.get_table_names() return f"数据库表: {', '.join(tables)}" async def _generate_query_prompt(self, table_name: str) -> str: """生成查询提示词""" return f""" 请为表 {table_name} 生成 SQL 查询语句。 要求: 1. 只使用 SELECT 查询 2. 包含适当的 WHERE 条件 3. 添加 LIMIT 限制结果数量 """

4. 插件配置

python
import json from pathlib import Path class PluginConfig: def __init__(self, config_dir: str = "config/plugins"): self.config_dir = Path(config_dir) self.config_dir.mkdir(parents=True, exist_ok=True) def load_config(self, plugin_name: str) -> Dict[str, Any]: """加载插件配置""" config_file = self.config_dir / f"{plugin_name}.json" if not config_file.exists(): return {} with open(config_file, 'r') as f: return json.load(f) def save_config( self, plugin_name: str, config: Dict[str, Any] ): """保存插件配置""" config_file = self.config_dir / f"{plugin_name}.json" with open(config_file, 'w') as f: json.dump(config, f, indent=2) def get_all_configs(self) -> Dict[str, Dict[str, Any]]: """获取所有插件配置""" configs = {} for config_file in self.config_dir.glob("*.json"): plugin_name = config_file.stem with open(config_file, 'r') as f: configs[plugin_name] = json.load(f) return configs

5. 插件依赖管理

python
from typing import Dict, List, Set class PluginDependencyManager: def __init__(self): self.dependencies: Dict[str, Set[str]] = {} self.dependents: Dict[str, Set[str]] = {} def add_dependency(self, plugin_name: str, dependency: str): """添加依赖关系""" if plugin_name not in self.dependencies: self.dependencies[plugin_name] = set() self.dependencies[plugin_name].add(dependency) if dependency not in self.dependents: self.dependents[dependency] = set() self.dependents[dependency].add(plugin_name) def get_load_order(self, plugins: List[str]) -> List[str]: """获取插件加载顺序(拓扑排序)""" visited = set() result = [] def visit(plugin_name: str): if plugin_name in visited: return visited.add(plugin_name) # 先加载依赖 for dep in self.dependencies.get(plugin_name, []): visit(dep) result.append(plugin_name) for plugin_name in plugins: visit(plugin_name) return result def check_circular_dependency(self) -> bool: """检查循环依赖""" visited = set() recursion_stack = set() def has_cycle(plugin_name: str) -> bool: visited.add(plugin_name) recursion_stack.add(plugin_name) for dep in self.dependencies.get(plugin_name, []): if dep not in visited: if has_cycle(dep): return True elif dep in recursion_stack: return True recursion_stack.remove(plugin_name) return False for plugin_name in self.dependencies: if plugin_name not in visited: if has_cycle(plugin_name): return True return False

6. 插件沙箱

python
import sys from typing import Any class PluginSandbox: def __init__(self): self.restricted_modules = { 'os', 'sys', 'subprocess', 'socket', 'pickle', 'shelve', 'marshal' } def create_sandbox(self, plugin_name: str) -> dict: """创建插件沙箱环境""" safe_globals = { '__builtins__': self._create_safe_builtins(), '__name__': f'plugin_{plugin_name}', } return safe_globals def _create_safe_builtins(self) -> dict: """创建安全的内置函数""" safe_builtins = {} # 允许的安全内置函数 allowed_builtins = [ 'abs', 'all', 'any', 'bool', 'dict', 'enumerate', 'filter', 'float', 'int', 'len', 'list', 'map', 'max', 'min', 'range', 'set', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip' ] for name in allowed_builtins: if hasattr(__builtins__, name): safe_builtins[name] = getattr(__builtins__, name) return safe_builtins def execute_in_sandbox( self, code: str, sandbox_env: dict ) -> Any: """在沙箱中执行代码""" try: exec(code, sandbox_env) return True except Exception as e: print(f"沙箱执行失败: {e}") return False

最佳实践:

  1. 插件隔离:确保插件之间相互隔离,避免冲突
  2. 依赖管理:正确处理插件依赖关系
  3. 错误处理:插件错误不应影响核心系统
  4. 版本兼容:支持插件版本管理和兼容性检查
  5. 安全限制:限制插件的权限和资源访问
  6. 文档完善:为每个插件提供清晰的文档

通过完善的插件系统,可以灵活扩展 MCP 服务器的功能,满足各种定制化需求。

标签:MCP