如何在Node.js中实现WebSocket服务器?
Node.js提供了多种方式实现WebSocket服务器,最常用的是ws库。使用ws库实现WebSocket服务器基础服务器实现const WebSocket = require('ws');// 创建WebSocket服务器const wss = new WebSocket.Server({ port: 8080 });console.log('WebSocket服务器运行在 ws://localhost:8080');wss.on('connection', (ws, request) => { console.log('新客户端连接'); // 获取客户端信息 const ip = request.socket.remoteAddress; console.log('客户端IP:', ip); // 发送欢迎消息 ws.send(JSON.stringify({ type: 'welcome', message: '欢迎连接到WebSocket服务器' })); // 接收消息 ws.on('message', (message) => { console.log('收到消息:', message.toString()); try { const data = JSON.parse(message); handleMessage(ws, data); } catch (error) { ws.send(JSON.stringify({ type: 'error', message: '消息格式错误' })); } }); // 处理错误 ws.on('error', (error) => { console.error('WebSocket错误:', error); }); // 连接关闭 ws.on('close', (code, reason) => { console.log('客户端断开连接, code:', code, 'reason:', reason.toString()); });});function handleMessage(ws, data) { switch (data.type) { case 'chat': broadcastMessage(data); break; case 'ping': ws.send(JSON.stringify({ type: 'pong' })); break; default: ws.send(JSON.stringify({ type: 'error', message: '未知消息类型' })); }}function broadcastMessage(data, excludeWs = null) { wss.clients.forEach((client) => { if (client !== excludeWs && client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(data)); } });}与HTTP服务器集成const http = require('http');const WebSocket = require('ws');// 创建HTTP服务器const server = http.createServer((req, res) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('WebSocket服务器运行中');});// 创建WebSocket服务器,绑定到HTTP服务器const wss = new WebSocket.Server({ server });wss.on('connection', (ws) => { console.log('WebSocket连接建立'); ws.on('message', (message) => { console.log('收到:', message.toString()); ws.send(`服务器回复: ${message}`); });});// 启动服务器const PORT = 3000;server.listen(PORT, () => { console.log(`服务器运行在 http://localhost:${PORT}`); console.log(`WebSocket运行在 ws://localhost:${PORT}`);});认证中间件const WebSocket = require('ws');const jwt = require('jsonwebtoken');const JWT_SECRET = 'your-secret-key';function authenticateClient(request, callback) { // 从URL参数获取token const token = new URL(request.url, 'http://localhost').searchParams.get('token'); if (!token) { return callback(new Error('缺少认证token')); } // 验证token jwt.verify(token, JWT_SECRET, (err, decoded) => { if (err) { return callback(new Error('无效的token')); } // 将用户信息附加到request对象 request.user = decoded; callback(null, true); });}const wss = new WebSocket.Server({ port: 8080, verifyClient: authenticateClient});wss.on('connection', (ws, request) => { const user = request.user; console.log(`用户 ${user.username} 已连接`); ws.send(JSON.stringify({ type: 'authenticated', user: user.username }));});房间功能实现const WebSocket = require('ws');const wss = new WebSocket.Server({ port: 8080 });// 存储房间和用户const rooms = new Map();wss.on('connection', (ws, request) => { let currentRoom = null; let userId = null; ws.on('message', (message) => { const data = JSON.parse(message.toString()); switch (data.type) { case 'join': handleJoin(ws, data.roomId, data.userId); break; case 'leave': handleLeave(ws); break; case 'message': handleMessage(ws, data); break; } }); function handleJoin(ws, roomId, uid) { // 如果已在房间,先离开 if (currentRoom) { handleLeave(ws); } currentRoom = roomId; userId = uid; // 创建房间(如果不存在) if (!rooms.has(roomId)) { rooms.set(roomId, new Map()); } const room = rooms.get(roomId); room.set(userId, ws); // 通知房间内其他用户 broadcastToRoom(roomId, { type: 'user_joined', userId: userId }, ws); ws.send(JSON.stringify({ type: 'joined', roomId: roomId, users: Array.from(room.keys()) })); } function handleLeave(ws) { if (currentRoom && rooms.has(currentRoom)) { const room = rooms.get(currentRoom); room.delete(userId); // 通知房间内其他用户 broadcastToRoom(currentRoom, { type: 'user_left', userId: userId }); // 如果房间为空,删除房间 if (room.size === 0) { rooms.delete(currentRoom); } } currentRoom = null; userId = null; } function handleMessage(ws, data) { if (!currentRoom) { ws.send(JSON.stringify({ type: 'error', message: '请先加入房间' })); return; } broadcastToRoom(currentRoom, { type: 'chat', userId: userId, message: data.message, timestamp: Date.now() }); } function broadcastToRoom(roomId, message, excludeWs = null) { if (!rooms.has(roomId)) return; const room = rooms.get(roomId); const data = JSON.stringify(message); room.forEach((clientWs) => { if (clientWs !== excludeWs && clientWs.readyState === WebSocket.OPEN) { clientWs.send(data); } }); } ws.on('close', () => { handleLeave(ws); });});心跳检测实现const WebSocket = require('ws');const wss = new WebSocket.Server({ port: 8080 });const HEARTBEAT_INTERVAL = 30000; // 30秒const HEARTBEAT_TIMEOUT = 5000; // 5秒超时wss.on('connection', (ws) => { ws.isAlive = true; ws.lastPong = Date.now(); // 监听pong消息 ws.on('pong', () => { ws.isAlive = true; ws.lastPong = Date.now(); }); // 监听消息 ws.on('message', (message) => { const data = JSON.parse(message.toString()); if (data.type === 'ping') { ws.send(JSON.stringify({ type: 'pong' })); } });});// 定期检查连接状态const interval = setInterval(() => { wss.clients.forEach((ws) => { // 检查是否超时 if (Date.now() - ws.lastPong > HEARTBEAT_TIMEOUT) { console.log('连接超时,关闭连接'); return ws.terminate(); } // 发送ping if (ws.isAlive === false) { console.log('连接无响应,关闭连接'); return ws.terminate(); } ws.isAlive = false; ws.ping(); });}, HEARTBEAT_INTERVAL);wss.on('close', () => { clearInterval(interval);});负载均衡支持const WebSocket = require('ws');const Redis = require('ioredis');const redis = new Redis();const wss = new WebSocket.Server({ port: 8080 });// 当前服务器IDconst SERVER_ID = 'server-1';// 订阅Redis频道const subscriber = new Redis();subscriber.subscribe('websocket_broadcast');// 处理来自其他服务器的消息subscriber.on('message', (channel, message) => { const data = JSON.parse(message); // 只处理不是自己发送的消息 if (data.serverId !== SERVER_ID) { broadcastToRoom(data.roomId, data.message); }});wss.on('connection', (ws, request) => { const userId = getUserId(request); const roomId = getRoomId(request); // 注册连接到Redis redis.hset('websocket_connections', userId, JSON.stringify({ serverId: SERVER_ID, roomId: roomId })); ws.on('message', (message) => { const data = JSON.parse(message.toString()); // 广播到当前服务器的客户端 broadcastToRoom(roomId, data); // 发布到Redis,通知其他服务器 redis.publish('websocket_broadcast', JSON.stringify({ serverId: SERVER_ID, roomId: roomId, message: data })); }); ws.on('close', () => { // 从Redis删除连接 redis.hdel('websocket_connections', userId); });});function broadcastToRoom(roomId, message) { wss.clients.forEach((client) => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(message)); } });}function getUserId(request) { // 从请求中获取用户ID return request.headers['x-user-id'];}function getRoomId(request) { // 从请求中获取房间ID return request.headers['x-room-id'];}最佳实践错误处理:妥善处理所有可能的错误资源清理:连接关闭时清理相关资源认证授权:实现完善的认证机制心跳检测:定期检查连接状态消息验证:验证所有接收到的消息日志记录:记录重要事件和错误性能监控:监控服务器性能指标负载均衡:支持多服务器部署