乐闻世界logo
搜索文章和话题

How to implement streaming in MCP?

2月19日 21:34

MCP's streaming capabilities allow real-time transmission of large amounts of data or results from long-running operations. Here are detailed implementation methods:

Streaming Basics

MCP supports two streaming modes:

  1. Server Push Stream: Server actively pushes data
  2. Client Request Stream: Client requests streaming response

1. Streaming Tool Definition

python
from mcp.server import Server from mcp.types import Tool import asyncio server = Server("my-mcp-server") @server.tool( name="stream_data", description="Stream large amounts of data" ) async def stream_data( count: int, batch_size: int = 10 ) -> AsyncIterator[str]: """Generate data in streams""" for i in range(0, count, batch_size): batch = list(range(i, min(i + batch_size, count))) # Return a batch of data yield { "type": "data", "batch": batch, "progress": (i + batch_size) / count } # Simulate processing delay await asyncio.sleep(0.1) # Send completion signal yield { "type": "done", "total": count }

2. Streaming Response Processor

python
from typing import AsyncIterator, Dict, Any import json class StreamProcessor: def __init__(self): self.active_streams = {} async def process_stream( self, stream_id: str, stream: AsyncIterator[Dict[str, Any]] ) -> AsyncIterator[Dict[str, Any]]: """Process streaming response""" self.active_streams[stream_id] = { "status": "active", "start_time": asyncio.get_event_loop().time() } try: async for chunk in stream: # Process each chunk processed = await self._process_chunk(chunk) # Update stream status if processed.get("type") == "done": self.active_streams[stream_id]["status"] = "completed" yield processed except Exception as e: self.active_streams[stream_id]["status"] = "error" self.active_streams[stream_id]["error"] = str(e) yield { "type": "error", "error": str(e) } finally: # Clean up stream if stream_id in self.active_streams: del self.active_streams[stream_id] async def _process_chunk( self, chunk: Dict[str, Any] ) -> Dict[str, Any]: """Process single chunk""" chunk_type = chunk.get("type") if chunk_type == "data": # Process data chunk return { "type": "data", "data": chunk.get("batch"), "progress": chunk.get("progress"), "timestamp": asyncio.get_event_loop().time() } elif chunk_type == "done": # Process completion signal return { "type": "done", "total": chunk.get("total"), "timestamp": asyncio.get_event_loop().time() } return chunk def get_stream_status(self, stream_id: str) -> Dict[str, Any]: """Get stream status""" return self.active_streams.get(stream_id, { "status": "not_found" })

3. Streaming Data Aggregator

python
class StreamAggregator: def __init__(self): self.buffers = {} self.aggregators = {} async def aggregate_stream( self, stream_id: str, stream: AsyncIterator[Dict[str, Any]], aggregation_func: callable ) -> Dict[str, Any]: """Aggregate streaming data""" buffer = [] self.buffers[stream_id] = buffer try: async for chunk in stream: if chunk.get("type") == "data": buffer.append(chunk.get("data")) elif chunk.get("type") == "done": # Execute aggregation result = aggregation_func(buffer) return { "type": "aggregated", "result": result, "count": len(buffer) } return { "type": "error", "error": "Stream ended without completion" } finally: if stream_id in self.buffers: del self.buffers[stream_id] def get_buffer(self, stream_id: str) -> list: """Get buffer data""" return self.buffers.get(stream_id, [])

4. Streaming Progress Tracking

python
class StreamProgressTracker: def __init__(self): self.progress = {} def track_stream( self, stream_id: str, total_items: int ): """Start tracking stream progress""" self.progress[stream_id] = { "total": total_items, "processed": 0, "start_time": asyncio.get_event_loop().time(), "last_update": asyncio.get_event_loop().time() } def update_progress( self, stream_id: str, processed: int ): """Update progress""" if stream_id not in self.progress: return self.progress[stream_id]["processed"] = processed self.progress[stream_id]["last_update"] = \ asyncio.get_event_loop().time() def get_progress(self, stream_id: str) -> Dict[str, Any]: """Get progress information""" if stream_id not in self.progress: return { "status": "not_found" } info = self.progress[stream_id] return { "total": info["total"], "processed": info["processed"], "percentage": (info["processed"] / info["total"]) * 100, "elapsed": asyncio.get_event_loop().time() - info["start_time"], "estimated_remaining": self._estimate_remaining(info) } def _estimate_remaining(self, info: Dict[str, Any]) -> float: """Estimate remaining time""" if info["processed"] == 0: return 0.0 elapsed = asyncio.get_event_loop().time() - info["start_time"] rate = info["processed"] / elapsed if rate == 0: return 0.0 remaining = (info["total"] - info["processed"]) / rate return remaining

5. Streaming Error Handling

python
class StreamErrorHandler: def __init__(self): self.error_handlers = {} def register_handler( self, error_type: str, handler: callable ): """Register error handler""" self.error_handlers[error_type] = handler async def handle_error( self, error: Exception, stream_id: str ) -> Dict[str, Any]: """Handle stream error""" error_type = type(error).__name__ # Find corresponding error handler handler = self.error_handlers.get(error_type) if handler: try: result = await handler(error, stream_id) return { "type": "handled", "result": result } except Exception as e: return { "type": "error", "error": f"Error handler failed: {str(e)}" } # Default error handling return { "type": "error", "error": str(error), "error_type": error_type }

6. Streaming Resource Management

python
class StreamResourceManager: def __init__(self): self.resources = {} def allocate_resource( self, stream_id: str, resource_type: str, resource: Any ): """Allocate stream resource""" if stream_id not in self.resources: self.resources[stream_id] = {} self.resources[stream_id][resource_type] = resource def get_resource( self, stream_id: str, resource_type: str ) -> Any: """Get stream resource""" if stream_id not in self.resources: return None return self.resources[stream_id].get(resource_type) def release_resources(self, stream_id: str): """Release stream resources""" if stream_id not in self.resources: return resources = self.resources[stream_id] # Clean up resources for resource_type, resource in resources.items(): if hasattr(resource, 'close'): resource.close() elif hasattr(resource, '__aenter__'): asyncio.create_task(resource.__aexit__(None, None, None)) del self.resources[stream_id]

7. Streaming Performance Monitoring

python
class StreamPerformanceMonitor: def __init__(self): self.metrics = {} def start_monitoring(self, stream_id: str): """Start monitoring""" self.metrics[stream_id] = { "start_time": asyncio.get_event_loop().time(), "chunks": 0, "bytes": 0, "errors": 0 } def record_chunk(self, stream_id: str, size: int): """Record chunk""" if stream_id not in self.metrics: return self.metrics[stream_id]["chunks"] += 1 self.metrics[stream_id]["bytes"] += size def record_error(self, stream_id: str): """Record error""" if stream_id not in self.metrics: return self.metrics[stream_id]["errors"] += 1 def get_metrics(self, stream_id: str) -> Dict[str, Any]: """Get performance metrics""" if stream_id not in self.metrics: return { "status": "not_found" } metrics = self.metrics[stream_id] elapsed = asyncio.get_event_loop().time() - metrics["start_time"] return { "elapsed": elapsed, "chunks": metrics["chunks"], "bytes": metrics["bytes"], "errors": metrics["errors"], "chunks_per_second": metrics["chunks"] / elapsed if elapsed > 0 else 0, "bytes_per_second": metrics["bytes"] / elapsed if elapsed > 0 else 0, "error_rate": metrics["errors"] / metrics["chunks"] if metrics["chunks"] > 0 else 0 }

Best Practices:

  1. Reasonable Chunking: Choose appropriate chunk sizes based on network conditions and data characteristics
  2. Progress Feedback: Provide clear progress information to improve user experience
  3. Error Recovery: Implement error recovery mechanisms to improve system robustness
  4. Resource Management: Release stream resources promptly to avoid memory leaks
  5. Performance Monitoring: Monitor stream performance to detect and resolve issues promptly
  6. Timeout Control: Set reasonable timeout values to prevent infinite waiting

Through comprehensive streaming mechanisms, you can efficiently handle large amounts of data and long-running operations.

标签:MCP