Multi-tenant support for MCP is crucial for enterprise applications, allowing multiple customers or organizations to be served in isolation within a single MCP server instance. Here are detailed implementation methods:
Multi-Tenant Architecture Design
MCP multi-tenancy should consider the following aspects:
- Data Isolation: Ensure complete isolation of data for different tenants
- Resource Isolation: Isolate computing resources and quotas
- Security Isolation: Implement tenant-level authentication and authorization
- Performance Isolation: Prevent a single tenant from affecting other tenants
1. Tenant Identification and Context
pythonfrom typing import Optional from dataclasses import dataclass @dataclass class TenantContext: """Tenant context""" tenant_id: str tenant_name: str user_id: str permissions: list quotas: dict class TenantContextManager: def __init__(self): self.contexts = {} def create_context( self, tenant_id: str, tenant_name: str, user_id: str, permissions: list, quotas: dict = None ) -> TenantContext: """Create tenant context""" context = TenantContext( tenant_id=tenant_id, tenant_name=tenant_name, user_id=user_id, permissions=permissions, quotas=quotas or self._get_default_quotas(tenant_id) ) self.contexts[tenant_id] = context return context def get_context(self, tenant_id: str) -> Optional[TenantContext]: """Get tenant context""" return self.contexts.get(tenant_id) def set_current_context(self, tenant_id: str): """Set current tenant context""" context = self.get_context(tenant_id) if not context: raise ValueError(f"Tenant {tenant_id} does not exist") # Use thread-local storage or async context variables import contextvars current_tenant.set(context) def _get_default_quotas(self, tenant_id: str) -> dict: """Get default quotas""" return { "max_tools": 100, "max_resources": 1000, "max_requests_per_minute": 1000, "max_storage_mb": 1024 } # Current tenant context variable current_tenant = contextvars.ContextVar('current_tenant', default=None)
2. Data Isolation
pythonfrom sqlalchemy import create_engine, Column, String, Integer, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, scoped_session Base = declarative_base() class TenantData(Base): """Tenant data table""" __tablename__ = 'tenant_data' id = Column(Integer, primary_key=True) tenant_id = Column(String(50), nullable=False, index=True) data_key = Column(String(100), nullable=False) data_value = Column(Text) __table_args__ = ( # Ensure tenant isolation Index('idx_tenant_key', 'tenant_id', 'data_key', unique=True), ) class MultiTenantDatabase: def __init__(self, database_url: str): self.engine = create_engine(database_url) Base.metadata.create_all(self.engine) self.SessionLocal = scoped_session( sessionmaker(autocommit=False, autoflush=False, bind=self.engine) ) def get_session(self, tenant_id: str): """Get tenant-specific database session""" session = self.SessionLocal() # Add tenant filter from sqlalchemy import event @event.listens_for(session, 'before_flush') def add_tenant_filter(session, context, instances): for instance in session.new: if hasattr(instance, 'tenant_id'): instance.tenant_id = tenant_id return session def query_tenant_data( self, tenant_id: str, data_key: str ) -> Optional[str]: """Query tenant data""" session = self.get_session(tenant_id) try: result = session.query(TenantData).filter( TenantData.tenant_id == tenant_id, TenantData.data_key == data_key ).first() return result.data_value if result else None finally: session.close() def save_tenant_data( self, tenant_id: str, data_key: str, data_value: str ): """Save tenant data""" session = self.get_session(tenant_id) try: existing = session.query(TenantData).filter( TenantData.tenant_id == tenant_id, TenantData.data_key == data_key ).first() if existing: existing.data_value = data_value else: new_data = TenantData( tenant_id=tenant_id, data_key=data_key, data_value=data_value ) session.add(new_data) session.commit() except Exception as e: session.rollback() raise e finally: session.close()
3. Resource Quota Management
pythonfrom collections import defaultdict import time class QuotaManager: def __init__(self): self.quotas = {} self.usage = defaultdict(lambda: defaultdict(int)) self.rate_limits = {} def set_quota( self, tenant_id: str, quota_type: str, limit: int ): """Set tenant quota""" if tenant_id not in self.quotas: self.quotas[tenant_id] = {} self.quotas[tenant_id][quota_type] = limit def check_quota( self, tenant_id: str, quota_type: str, amount: int = 1 ) -> bool: """Check if quota is sufficient""" if tenant_id not in self.quotas: return True limit = self.quotas[tenant_id].get(quota_type) if limit is None: return True current_usage = self.usage[tenant_id][quota_type] return current_usage + amount <= limit def consume_quota( self, tenant_id: str, quota_type: str, amount: int = 1 ) -> bool: """Consume quota""" if not self.check_quota(tenant_id, quota_type, amount): return False self.usage[tenant_id][quota_type] += amount return True def get_usage( self, tenant_id: str, quota_type: str ) -> int: """Get usage""" return self.usage[tenant_id][quota_type] def reset_usage(self, tenant_id: str): """Reset usage""" if tenant_id in self.usage: self.usage[tenant_id].clear() def check_rate_limit( self, tenant_id: str, window: int = 60, max_requests: int = 100 ) -> bool: """Check rate limit""" now = time.time() if tenant_id not in self.rate_limits: self.rate_limits[tenant_id] = [] # Clean up expired request records self.rate_limits[tenant_id] = [ timestamp for timestamp in self.rate_limits[tenant_id] if now - timestamp < window ] # Check if limit exceeded if len(self.rate_limits[tenant_id]) >= max_requests: return False # Record new request self.rate_limits[tenant_id].append(now) return True
4. Tenant-Level Tools and Resources
pythonfrom mcp.server import Server from functools import wraps class MultiTenantServer(Server): def __init__(self, name: str, tenant_manager: TenantContextManager): super().__init__(name) self.tenant_manager = tenant_manager self.tenant_tools = defaultdict(dict) self.tenant_resources = defaultdict(dict) def tenant_tool( self, name: str, description: str, tenant_id: str = None ): """Tenant-specific tool decorator""" def decorator(func): # Register tool self.tenant_tools[tenant_id or "default"][name] = { "function": func, "description": description } @wraps(func) async def wrapper(*args, **kwargs): # Get current tenant context = current_tenant.get() if not context: raise PermissionError("No tenant context found") # Check tenant permission if tenant_id and context.tenant_id != tenant_id: raise PermissionError("No permission to access this tool") # Execute tool return await func(*args, **kwargs) return wrapper return decorator def tenant_resource( self, uri: str, name: str, description: str, tenant_id: str = None ): """Tenant-specific resource decorator""" def decorator(func): # Register resource self.tenant_resources[tenant_id or "default"][uri] = { "function": func, "name": name, "description": description } @wraps(func) async def wrapper(*args, **kwargs): # Get current tenant context = current_tenant.get() if not context: raise PermissionError("No tenant context found") # Check tenant permission if tenant_id and context.tenant_id != tenant_id: raise PermissionError("No permission to access this resource") # Execute resource return await func(*args, **kwargs) return wrapper return decorator async def list_tools(self, tenant_id: str = None) -> list: """List available tools""" context = current_tenant.get() if not context: return [] # Get default tools and tenant-specific tools tools = [] # Add default tools for name, tool_info in self.tenant_tools["default"].items(): tools.append({ "name": name, "description": tool_info["description"] }) # Add tenant-specific tools if context.tenant_id in self.tenant_tools: for name, tool_info in self.tenant_tools[context.tenant_id].items(): tools.append({ "name": name, "description": tool_info["description"] }) return tools
5. Tenant Authentication and Authorization
pythonimport jwt from datetime import datetime, timedelta from typing import Dict, Any class TenantAuthenticator: def __init__(self, secret_key: str): self.secret_key = secret_key def generate_token( self, tenant_id: str, user_id: str, permissions: list, expires_in: int = 3600 ) -> str: """Generate tenant token""" payload = { "tenant_id": tenant_id, "user_id": user_id, "permissions": permissions, "exp": datetime.utcnow() + timedelta(seconds=expires_in), "iat": datetime.utcnow() } token = jwt.encode(payload, self.secret_key, algorithm="HS256") return token def verify_token(self, token: str) -> Dict[str, Any]: """Verify tenant token""" try: payload = jwt.decode(token, self.secret_key, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise ValueError("Token has expired") except jwt.InvalidTokenError: raise ValueError("Invalid token") def check_permission( self, token: str, required_permission: str ) -> bool: """Check permission""" payload = self.verify_token(token) permissions = payload.get("permissions", []) return required_permission in permissions or "admin" in permissions
6. Tenant Monitoring and Reporting
pythonfrom collections import defaultdict from datetime import datetime, timedelta class TenantMonitor: def __init__(self): self.metrics = defaultdict(lambda: defaultdict(list)) def record_metric( self, tenant_id: str, metric_name: str, value: float ): """Record metric""" timestamp = datetime.now() self.metrics[tenant_id][metric_name].append({ "value": value, "timestamp": timestamp }) # Limit history size if len(self.metrics[tenant_id][metric_name]) > 1000: self.metrics[tenant_id][metric_name] = \ self.metrics[tenant_id][metric_name][-1000:] def get_metrics( self, tenant_id: str, metric_name: str, since: datetime = None ) -> list: """Get metrics""" if tenant_id not in self.metrics: return [] if metric_name not in self.metrics[tenant_id]: return [] records = self.metrics[tenant_id][metric_name] if since: records = [ record for record in records if record["timestamp"] >= since ] return records def get_aggregated_metrics( self, tenant_id: str, metric_name: str, since: datetime = None ) -> dict: """Get aggregated metrics""" records = self.get_metrics(tenant_id, metric_name, since) if not records: return {} values = [record["value"] for record in records] return { "count": len(values), "sum": sum(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values) } def generate_tenant_report( self, tenant_id: str, since: datetime = None ) -> dict: """Generate tenant report""" if not since: since = datetime.now() - timedelta(days=7) report = { "tenant_id": tenant_id, "period": { "start": since, "end": datetime.now() }, "metrics": {} } if tenant_id in self.metrics: for metric_name in self.metrics[tenant_id]: report["metrics"][metric_name] = \ self.get_aggregated_metrics(tenant_id, metric_name, since) return report
Best Practices:
- Data Isolation: Use tenant ID as primary key or index for all data tables
- Quota Management: Set reasonable resource quotas for each tenant
- Permission Control: Implement fine-grained tenant-level permission control
- Performance Monitoring: Monitor resource usage for each tenant
- Security Auditing: Log all tenant operations for auditing
- Elastic Scaling: Dynamically scale resources based on tenant needs
Through comprehensive multi-tenant support, you can provide isolated, secure, and efficient services for multiple customers or organizations within a single MCP server instance.