MCP can be integrated with various AI frameworks and tools to extend its functionality and application scenarios. Here are detailed integration methods and best practices:
Integration Architecture Design
MCP integration should consider following aspects:
- Framework Compatibility: Ensure compatibility with target frameworks
- Performance Impact: Minimize impact on system performance
- Functional Completeness: Maintain completeness of MCP and framework functionality
- Error Handling: Properly handle errors during integration
- Configuration Management: Unified management of integration configuration
1. Integration with LangChain
pythonfrom langchain.agents import AgentExecutor, create_openai_tools_agent from langchain_openai import ChatOpenAI from langchain.tools import Tool from mcp.server import Server class MCPLangChainIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.langchain_tools = [] async def convert_mcp_to_langchain_tools(self) -> list: """Convert MCP tools to LangChain tools""" mcp_tools = await self.mcp_server.list_tools() langchain_tools = [] for tool_info in mcp_tools: tool = Tool( name=tool_info["name"], description=tool_info["description"], func=self._create_tool_wrapper(tool_info["name"]) ) langchain_tools.append(tool) return langchain_tools def _create_tool_wrapper(self, tool_name: str): """Create tool wrapper""" async def wrapper(**kwargs): result = await self.mcp_server.call_tool( tool_name, kwargs ) return result return wrapper async def create_langchain_agent(self): """Create LangChain Agent""" # Convert MCP tools tools = await self.convert_mcp_to_langchain_tools() # Create LLM llm = ChatOpenAI( model="gpt-4", temperature=0 ) # Create Agent agent = create_openai_tools_agent(llm, tools) # Create AgentExecutor agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True ) return agent_executor async def run_agent(self, query: str): """Run Agent""" agent_executor = await self.create_langchain_agent() result = await agent_executor.ainvoke({ "input": query }) return result["output"]
2. Integration with LlamaIndex
pythonfrom llama_index.core import VectorStoreIndex, SimpleDirectoryReader from llama_index.core.tools import QueryEngineTool, ToolMetadata from llama_index.core.agent import ReActAgent from mcp.server import Server class MCPLlamaIndexIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server async def create_mcp_query_engine(self, tool_name: str): """Create MCP query engine""" async def query_engine_fn(query: str) -> str: result = await self.mcp_server.call_tool( tool_name, {"query": query} ) return result return query_engine_fn async def create_llamaindex_tools(self) -> list: """Create LlamaIndex tools""" mcp_tools = await self.mcp_server.list_tools() tools = [] for tool_info in mcp_tools: query_engine = await self.create_mcp_query_engine( tool_info["name"] ) tool = QueryEngineTool( query_engine=query_engine, metadata=ToolMetadata( name=tool_info["name"], description=tool_info["description"] ) ) tools.append(tool) return tools async def create_react_agent(self): """Create ReAct Agent""" tools = await self.create_llamaindex_tools() agent = ReActAgent.from_tools( tools=tools, verbose=True ) return agent async def query_with_agent(self, query: str): """Query with Agent""" agent = await self.create_react_agent() response = agent.query(query) return response.response
3. Integration with AutoGPT
pythonfrom autogpt.agent.agent import Agent from autogpt.config import Config from autogpt.models.command import Command from mcp.server import Server class MCPAutoGPTIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.command_registry = {} async def register_mcp_commands(self): """Register MCP commands""" mcp_tools = await self.mcp_server.list_tools() for tool_info in mcp_tools: command = Command( name=tool_info["name"], description=tool_info["description"], function=self._create_command_function(tool_info["name"]) ) self.command_registry[tool_info["name"]] = command def _create_command_function(self, tool_name: str): """Create command function""" async def command_function(**kwargs): result = await self.mcp_server.call_tool( tool_name, kwargs ) return result return command_function async def create_autogpt_agent(self, config: Config): """Create AutoGPT Agent""" # Register MCP commands await self.register_mcp_commands() # Create Agent agent = Agent( ai_name="MCP-Agent", ai_role="Assistant", commands=self.command_registry, config=config ) return agent async def run_autogpt_task(self, task: str): """Run AutoGPT task""" config = Config() agent = await self.create_autogpt_agent(config) result = await agent.run(task) return result
4. Integration with FastAPI
pythonfrom fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from mcp.server import Server from pydantic import BaseModel class MCPFastAPIIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.app = FastAPI(title="MCP API") self._setup_middleware() self._setup_routes() def _setup_middleware(self): """Setup middleware""" self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def _setup_routes(self): """Setup routes""" @self.app.get("/tools") async def list_tools(): """List all tools""" tools = await self.mcp_server.list_tools() return {"tools": tools} @self.app.post("/tools/{tool_name}/call") async def call_tool(tool_name: str, params: dict): """Call tool""" try: result = await self.mcp_server.call_tool( tool_name, params ) return {"result": result} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/resources") async def list_resources(): """List all resources""" resources = await self.mcp_server.list_resources() return {"resources": resources} @self.app.get("/resources/{uri}") async def read_resource(uri: str): """Read resource""" try: content = await self.mcp_server.read_resource(uri) return {"content": content} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/prompts") async def list_prompts(): """List all prompts""" prompts = await self.mcp_server.list_prompts() return {"prompts": prompts} @self.app.get("/health") async def health_check(): """Health check""" return {"status": "healthy"} def get_app(self) -> FastAPI: """Get FastAPI application""" return self.app
5. Integration with WebSocket
pythonimport asyncio import json from fastapi import WebSocket from mcp.server import Server class MCPWebSocketIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.active_connections = [] async def handle_websocket(self, websocket: WebSocket): """Handle WebSocket connection""" await websocket.accept() self.active_connections.append(websocket) try: while True: # Receive message data = await websocket.receive_text() message = json.loads(data) # Handle message response = await self._handle_message(message) # Send response await websocket.send_text(json.dumps(response)) except Exception as e: print(f"WebSocket error: {e}") finally: self.active_connections.remove(websocket) async def _handle_message(self, message: dict) -> dict: """Handle message""" message_type = message.get("type") if message_type == "list_tools": tools = await self.mcp_server.list_tools() return {"type": "tools_list", "data": tools} elif message_type == "call_tool": result = await self.mcp_server.call_tool( message["tool_name"], message.get("params", {}) ) return {"type": "tool_result", "data": result} elif message_type == "list_resources": resources = await self.mcp_server.list_resources() return {"type": "resources_list", "data": resources} else: return {"type": "error", "message": "Unknown message type"} async def broadcast_message(self, message: dict): """Broadcast message to all connections""" message_text = json.dumps(message) for connection in self.active_connections: try: await connection.send_text(message_text) except Exception as e: print(f"Failed to send message: {e}")
6. Integration with GraphQL
pythonimport strawberry from strawberry.types import Info from mcp.server import Server @strawberry.type class MCPTool: name: str description: str @strawberry.type class MCPResource: uri: str name: str description: str @strawberry.type class Query: @strawberry.field async def tools(self, info: Info) -> list[MCPTool]: """Get all tools""" mcp_server = info.context["mcp_server"] tools = await mcp_server.list_tools() return [ MCPTool( name=tool["name"], description=tool["description"] ) for tool in tools ] @strawberry.field async def resources(self, info: Info) -> list[MCPResource]: """Get all resources""" mcp_server = info.context["mcp_server"] resources = await mcp_server.list_resources() return [ MCPResource( uri=resource["uri"], name=resource["name"], description=resource["description"] ) for resource in resources ] @strawberry.type class Mutation: @strawberry.mutation async def call_tool( self, tool_name: str, params: dict, info: Info ) -> str: """Call tool""" mcp_server = info.context["mcp_server"] result = await mcp_server.call_tool(tool_name, params) return str(result) class MCPGraphQLIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.schema = strawberry.Schema( query=Query, mutation=Mutation ) async def execute_query(self, query: str, variables: dict = None): """Execute GraphQL query""" context = {"mcp_server": self.mcp_server} result = await self.schema.execute( query, variable_values=variables, context_value=context ) if result.errors: return { "errors": [str(error) for error in result.errors] } return {"data": result.data}
7. Integration with gRPC
pythonimport grpc from concurrent import futures import mcp_pb2 import mcp_pb2_grpc from mcp.server import Server class MCPServicer(mcp_pb2_grpc.MCPServicer): def __init__(self, mcp_server: Server): self.mcp_server = mcp_server async def ListTools( self, request: mcp_pb2.ListToolsRequest, context: grpc.ServicerContext ) -> mcp_pb2.ListToolsResponse: """List tools""" tools = await self.mcp_server.list_tools() tool_protos = [ mcp_pb2.Tool( name=tool["name"], description=tool["description"] ) for tool in tools ] return mcp_pb2.ListToolsResponse(tools=tool_protos) async def CallTool( self, request: mcp_pb2.CallToolRequest, context: grpc.ServicerContext ) -> mcp_pb2.CallToolResponse: """Call tool""" params = dict(request.params) result = await self.mcp_server.call_tool( request.tool_name, params ) return mcp_pb2.CallToolResponse(result=str(result)) class MCPGRPCIntegration: def __init__(self, mcp_server: Server, port: int = 50051): self.mcp_server = mcp_server self.port = port self.server = None async def start_server(self): """Start gRPC server""" self.server = grpc.aio.server( futures.ThreadPoolExecutor(max_workers=10) ) mcp_pb2_grpc.add_MCPServicer_to_server( MCPServicer(self.mcp_server), self.server ) self.server.add_insecure_port(f'[::]:{self.port}') await self.server.start() print(f"gRPC server started on port {self.port}") async def stop_server(self): """Stop gRPC server""" if self.server: await self.server.stop(0) print("gRPC server stopped")
Best Practices:
- Async Processing: Use async programming to avoid blocking
- Error Handling: Properly handle errors during integration
- Performance Optimization: Cache frequently called results
- Logging: Log all integration operations
- Test Coverage: Write integration tests to ensure functionality
- Complete Documentation: Provide clear integration documentation
Through integration with other AI frameworks and tools, you can extend MCP's functionality and application scenarios.