乐闻世界logo
搜索文章和话题

How to implement a WebSocket server in Node.js?

2月18日 18:59

Node.js provides multiple ways to implement WebSocket servers, the most commonly used is the ws library.

Implementing WebSocket Server with ws Library

Basic Server Implementation

javascript
const WebSocket = require('ws'); // Create WebSocket server const wss = new WebSocket.Server({ port: 8080 }); console.log('WebSocket server running on ws://localhost:8080'); wss.on('connection', (ws, request) => { console.log('New client connected'); // Get client information const ip = request.socket.remoteAddress; console.log('Client IP:', ip); // Send welcome message ws.send(JSON.stringify({ type: 'welcome', message: 'Welcome to WebSocket server' })); // Receive messages ws.on('message', (message) => { console.log('Received message:', message.toString()); try { const data = JSON.parse(message); handleMessage(ws, data); } catch (error) { ws.send(JSON.stringify({ type: 'error', message: 'Invalid message format' })); } }); // Handle errors ws.on('error', (error) => { console.error('WebSocket error:', error); }); // Connection closed ws.on('close', (code, reason) => { console.log('Client disconnected, code:', code, 'reason:', reason.toString()); }); }); function handleMessage(ws, data) { switch (data.type) { case 'chat': broadcastMessage(data); break; case 'ping': ws.send(JSON.stringify({ type: 'pong' })); break; default: ws.send(JSON.stringify({ type: 'error', message: 'Unknown message type' })); } } function broadcastMessage(data, excludeWs = null) { wss.clients.forEach((client) => { if (client !== excludeWs && client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(data)); } }); }

Integration with HTTP Server

javascript
const http = require('http'); const WebSocket = require('ws'); // Create HTTP server const server = http.createServer((req, res) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('WebSocket server is running'); }); // Create WebSocket server, bind to HTTP server const wss = new WebSocket.Server({ server }); wss.on('connection', (ws) => { console.log('WebSocket connection established'); ws.on('message', (message) => { console.log('Received:', message.toString()); ws.send(`Server reply: ${message}`); }); }); // Start server const PORT = 3000; server.listen(PORT, () => { console.log(`Server running on http://localhost:${PORT}`); console.log(`WebSocket running on ws://localhost:${PORT}`); });

Authentication Middleware

javascript
const WebSocket = require('ws'); const jwt = require('jsonwebtoken'); const JWT_SECRET = 'your-secret-key'; function authenticateClient(request, callback) { // Get token from URL parameters const token = new URL(request.url, 'http://localhost').searchParams.get('token'); if (!token) { return callback(new Error('Missing authentication token')); } // Verify token jwt.verify(token, JWT_SECRET, (err, decoded) => { if (err) { return callback(new Error('Invalid token')); } // Attach user info to request object request.user = decoded; callback(null, true); }); } const wss = new WebSocket.Server({ port: 8080, verifyClient: authenticateClient }); wss.on('connection', (ws, request) => { const user = request.user; console.log(`User ${user.username} connected`); ws.send(JSON.stringify({ type: 'authenticated', user: user.username })); });

Room Functionality Implementation

javascript
const WebSocket = require('ws'); const wss = new WebSocket.Server({ port: 8080 }); // Store rooms and users const rooms = new Map(); wss.on('connection', (ws, request) => { let currentRoom = null; let userId = null; ws.on('message', (message) => { const data = JSON.parse(message.toString()); switch (data.type) { case 'join': handleJoin(ws, data.roomId, data.userId); break; case 'leave': handleLeave(ws); break; case 'message': handleMessage(ws, data); break; } }); function handleJoin(ws, roomId, uid) { // If already in a room, leave first if (currentRoom) { handleLeave(ws); } currentRoom = roomId; userId = uid; // Create room (if not exists) if (!rooms.has(roomId)) { rooms.set(roomId, new Map()); } const room = rooms.get(roomId); room.set(userId, ws); // Notify other users in the room broadcastToRoom(roomId, { type: 'user_joined', userId: userId }, ws); ws.send(JSON.stringify({ type: 'joined', roomId: roomId, users: Array.from(room.keys()) })); } function handleLeave(ws) { if (currentRoom && rooms.has(currentRoom)) { const room = rooms.get(currentRoom); room.delete(userId); // Notify other users in the room broadcastToRoom(currentRoom, { type: 'user_left', userId: userId }); // Delete room if empty if (room.size === 0) { rooms.delete(currentRoom); } } currentRoom = null; userId = null; } function handleMessage(ws, data) { if (!currentRoom) { ws.send(JSON.stringify({ type: 'error', message: 'Please join a room first' })); return; } broadcastToRoom(currentRoom, { type: 'chat', userId: userId, message: data.message, timestamp: Date.now() }); } function broadcastToRoom(roomId, message, excludeWs = null) { if (!rooms.has(roomId)) return; const room = rooms.get(roomId); const data = JSON.stringify(message); room.forEach((clientWs) => { if (clientWs !== excludeWs && clientWs.readyState === WebSocket.OPEN) { clientWs.send(data); } }); } ws.on('close', () => { handleLeave(ws); }); });

Heartbeat Detection Implementation

javascript
const WebSocket = require('ws'); const wss = new WebSocket.Server({ port: 8080 }); const HEARTBEAT_INTERVAL = 30000; // 30 seconds const HEARTBEAT_TIMEOUT = 5000; // 5 second timeout wss.on('connection', (ws) => { ws.isAlive = true; ws.lastPong = Date.now(); // Listen for pong messages ws.on('pong', () => { ws.isAlive = true; ws.lastPong = Date.now(); }); // Listen for messages ws.on('message', (message) => { const data = JSON.parse(message.toString()); if (data.type === 'ping') { ws.send(JSON.stringify({ type: 'pong' })); } }); }); // Periodically check connection status const interval = setInterval(() => { wss.clients.forEach((ws) => { // Check if timeout if (Date.now() - ws.lastPong > HEARTBEAT_TIMEOUT) { console.log('Connection timeout, closing connection'); return ws.terminate(); } // Send ping if (ws.isAlive === false) { console.log('Connection not responding, closing connection'); return ws.terminate(); } ws.isAlive = false; ws.ping(); }); }, HEARTBEAT_INTERVAL); wss.on('close', () => { clearInterval(interval); });

Load Balancing Support

javascript
const WebSocket = require('ws'); const Redis = require('ioredis'); const redis = new Redis(); const wss = new WebSocket.Server({ port: 8080 }); // Current server ID const SERVER_ID = 'server-1'; // Subscribe to Redis channel const subscriber = new Redis(); subscriber.subscribe('websocket_broadcast'); // Handle messages from other servers subscriber.on('message', (channel, message) => { const data = JSON.parse(message); // Only process messages not sent by self if (data.serverId !== SERVER_ID) { broadcastToRoom(data.roomId, data.message); } }); wss.on('connection', (ws, request) => { const userId = getUserId(request); const roomId = getRoomId(request); // Register connection to Redis redis.hset('websocket_connections', userId, JSON.stringify({ serverId: SERVER_ID, roomId: roomId })); ws.on('message', (message) => { const data = JSON.parse(message.toString()); // Broadcast to clients on current server broadcastToRoom(roomId, data); // Publish to Redis, notify other servers redis.publish('websocket_broadcast', JSON.stringify({ serverId: SERVER_ID, roomId: roomId, message: data })); }); ws.on('close', () => { // Delete connection from Redis redis.hdel('websocket_connections', userId); }); }); function broadcastToRoom(roomId, message) { wss.clients.forEach((client) => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(message)); } }); } function getUserId(request) { // Get user ID from request return request.headers['x-user-id']; } function getRoomId(request) { // Get room ID from request return request.headers['x-room-id']; }

Best Practices

  1. Error Handling: Properly handle all possible errors
  2. Resource Cleanup: Clean up related resources when connection closes
  3. Authentication Authorization: Implement comprehensive authentication mechanisms
  4. Heartbeat Detection: Periodically check connection status
  5. Message Validation: Validate all received messages
  6. Logging: Record important events and errors
  7. Performance Monitoring: Monitor server performance metrics
  8. Load Balancing: Support multi-server deployment
标签:WebSocket