GraphQL 订阅(Subscriptions)如何实现
GraphQL 订阅(Subscriptions)实现详解GraphQL 订阅允许客户端实时接收服务器推送的数据更新,是构建实时应用的关键功能。以下是 GraphQL 订阅的详细实现方案。1. 订阅基础概念订阅的工作原理客户端通过 WebSocket 建立持久连接客户端发送订阅查询服务器保持连接并监听事件当事件发生时,服务器推送数据到客户端客户端接收并处理更新订阅 vs 轮询| 特性 | 订阅 | 轮询 ||------|------|------|| 实时性 | 高 | 低 || 服务器负载 | 低(事件驱动) | 高(持续查询) || 网络开销 | 低(按需推送) | 高(定期请求) || 实现复杂度 | 高 | 低 || 适用场景 | 实时更新 | 定期检查 |2. 服务器端实现使用 graphql-subscriptionsconst { PubSub } = require('graphql-subscriptions');const pubsub = new PubSub();const POST_CREATED = 'POST_CREATED';const POST_UPDATED = 'POST_UPDATED';const COMMENT_ADDED = 'COMMENT_ADDED';const typeDefs = ` type Post { id: ID! title: String! content: String! author: User! createdAt: DateTime! } type Comment { id: ID! text: String! author: User! post: Post! createdAt: DateTime! } type Subscription { postCreated: Post! postUpdated(postId: ID!): Post! commentAdded(postId: ID!): Comment! }`;const resolvers = { Subscription: { postCreated: { subscribe: () => pubsub.asyncIterator([POST_CREATED]) }, postUpdated: { subscribe: (_, { postId }) => { const asyncIterator = pubsub.asyncIterator([POST_UPDATED]); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { // 过滤:只返回指定帖子的更新 if (event.postUpdated.id === postId) { yield event; } } })(); } }; } }, commentAdded: { subscribe: (_, { postId }) => { const asyncIterator = pubsub.asyncIterator([COMMENT_ADDED]); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { if (event.commentAdded.postId === postId) { yield event; } } })(); } }; } } }, Mutation: { createPost: async (_, { input }) => { const post = await Post.create(input); pubsub.publish(POST_CREATED, { postCreated: post }); return post; }, updatePost: async (_, { id, input }) => { const post = await Post.update(id, input); pubsub.publish(POST_UPDATED, { postUpdated: post }); return post; }, addComment: async (_, { input }) => { const comment = await Comment.create(input); pubsub.publish(COMMENT_ADDED, { commentAdded: comment }); return comment; } }};使用 Redis PubSubconst { RedisPubSub } = require('graphql-redis-subscriptions');const pubsub = new RedisPubSub({ connection: { host: 'localhost', port: 6379, retry_strategy: (options) => { if (options.error && options.error.code === 'ECONNREFUSED') { return new Error('Redis connection refused'); } if (options.total_retry_time > 1000 * 60 * 60) { return new Error('Redis retry time exhausted'); } if (options.attempt > 10) { return undefined; } return Math.min(options.attempt * 100, 3000); } }});// 使用方式与内存 PubSub 相同const resolvers = { Subscription: { postCreated: { subscribe: () => pubsub.asyncIterator([POST_CREATED]) } }};3. Apollo Server 订阅实现配置 Apollo Serverconst { ApolloServer } = require('apollo-server-express');const { createServer } = require('http');const { WebSocketServer } = require('ws');const { useServer } = require('graphql-ws/lib/use/ws');const server = new ApolloServer({ typeDefs, resolvers, context: ({ req, connection }) => { // HTTP 请求的 context if (req) { return { token: req.headers.authorization }; } // WebSocket 连接的 context if (connection) { return { token: connection.context.authorization }; } }});const httpServer = createServer(server);// 创建 WebSocket 服务器const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql'});useServer( { schema: server.schema, context: (ctx) => { // 验证连接 const token = ctx.connectionParams?.authorization; if (!token) { throw new Error('Unauthorized'); } return { token }; }, onConnect: (ctx) => { console.log('Client connected'); return { authorization: ctx.connectionParams?.authorization }; }, onDisconnect: (ctx, code, reason) => { console.log('Client disconnected', { code, reason }); } }, wsServer);server.listen().then(({ url }) => { console.log(`Server ready at ${url}`);});4. 客户端实现Apollo Client 订阅import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';import { WebSocketLink } from '@apollo/client/link/ws';import { getMainDefinition } from '@apollo/client/utilities';// HTTP 链接const httpLink = new HttpLink({ uri: 'http://localhost:4000/graphql'});// WebSocket 链接const wsLink = new WebSocketLink({ uri: 'ws://localhost:4000/graphql', options: { reconnect: true, connectionParams: { authToken: localStorage.getItem('token') }, lazy: true, connectionCallback: (error) => { if (error) { console.error('WebSocket connection error:', error); } else { console.log('WebSocket connected'); } } }});// 分割链接const splitLink = split( ({ query }) => { const definition = getMainDefinition(query); return ( definition.kind === 'OperationDefinition' && definition.operation === 'subscription' ); }, wsLink, httpLink);const client = new ApolloClient({ link: splitLink, cache: new InMemoryCache()});// 使用订阅import { gql, useSubscription } from '@apollo/client';const POST_CREATED_SUBSCRIPTION = gql` subscription OnPostCreated { postCreated { id title content author { name } createdAt } }`;function PostList() { const { data, loading, error } = useSubscription(POST_CREATED_SUBSCRIPTION); if (loading) return <div>Loading...</div>; if (error) return <div>Error: {error.message}</div>; return ( <div> <h3>New Post Created:</h3> <p>{data.postCreated.title}</p> </div> );}React Hooks 订阅import { useSubscription, useMutation } from '@apollo/client';function ChatRoom({ roomId }) { const MESSAGE_ADDED = gql` subscription OnMessageAdded($roomId: ID!) { messageAdded(roomId: $roomId) { id text author { name } createdAt } } `; const SEND_MESSAGE = gql` mutation SendMessage($roomId: ID!, $text: String!) { sendMessage(roomId: $roomId, text: $text) { id text } } `; const { data: messageData, loading } = useSubscription(MESSAGE_ADDED, { variables: { roomId } }); const [sendMessage] = useMutation(SEND_MESSAGE); const handleSendMessage = (text) => { sendMessage({ variables: { roomId, text } }); }; return ( <div> {loading ? ( <div>Connecting...</div> ) : ( <div> <MessageList messages={messageData?.messageAdded} /> <MessageInput onSend={handleSendMessage} /> </div> )} </div> );}5. 订阅过滤基于参数的过滤const resolvers = { Subscription: { notification: { subscribe: (_, { userId, types }) => { const asyncIterator = pubsub.asyncIterator(['NOTIFICATION']); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { const notification = event.notification; // 过滤用户 if (userId && notification.userId !== userId) { continue; } // 过滤类型 if (types && !types.includes(notification.type)) { continue; } yield event; } })(); } }; } } }};基于权限的过滤const resolvers = { Subscription: { userUpdate: { subscribe: async (_, __, context) => { // 验证用户权限 if (!context.user) { throw new Error('Unauthorized'); } const asyncIterator = pubsub.asyncIterator(['USER_UPDATE']); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { const update = event.userUpdate; // 只返回当前用户的更新 if (update.userId !== context.user.id) { continue; } // 只返回有权限查看的字段 const filteredUpdate = filterSensitiveFields(update, context.user.role); yield { userUpdate: filteredUpdate }; } })(); } }; } } }};6. 订阅错误处理连接错误处理const wsLink = new WebSocketLink({ uri: 'ws://localhost:4000/graphql', options: { reconnect: true, retryAttempts: 5, connectionParams: async () => { const token = await getAuthToken(); return { token }; }, on: { connected: () => console.log('WebSocket connected'), error: (error) => { console.error('WebSocket error:', error); // 尝试重新连接 }, closed: (event) => { console.log('WebSocket closed:', event); // 清理资源 } } }});订阅错误处理function useSubscriptionWithErrorHandling(query, options) { const { data, error, loading } = useSubscription(query, options); useEffect(() => { if (error) { console.error('Subscription error:', error); // 根据错误类型处理 if (error.networkError) { // 网络错误,尝试重连 handleNetworkError(error); } else if (error.graphQLErrors) { // GraphQL 错误 handleGraphQLError(error); } } }, [error]); return { data, error, loading };}7. 订阅性能优化批量发布class BatchPublisher { constructor(pubsub, eventName, batchSize = 10, flushInterval = 100) { this.pubsub = pubsub; this.eventName = eventName; this.batchSize = batchSize; this.flushInterval = flushInterval; this.batch = []; this.flushTimer = null; } add(event) { this.batch.push(event); if (this.batch.length >= this.batchSize) { this.flush(); } else if (!this.flushTimer) { this.flushTimer = setTimeout(() => this.flush(), this.flushInterval); } } flush() { if (this.batch.length === 0) return; // 批量发布 this.pubsub.publish(this.eventName, { batch: this.batch }); this.batch = []; if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } }}// 使用批量发布器const batchPublisher = new BatchPublisher(pubsub, 'BATCH_EVENTS');// 添加事件到批次batchPublisher.add({ type: 'event1', data: {} });batchPublisher.add({ type: 'event2', data: {} });订阅节流function useThrottledSubscription(query, options, throttleMs = 1000) { const { data, loading } = useSubscription(query, options); const [throttledData, setThrottledData] = useState(null); const lastUpdate = useRef(0); useEffect(() => { if (data) { const now = Date.now(); if (now - lastUpdate.current > throttleMs) { setThrottledData(data); lastUpdate.current = now; } } }, [data, throttleMs]); return { data: throttledData, loading };}8. 订阅监控连接监控const connectionMetrics = { activeConnections: 0, totalConnections: 0, disconnections: 0};const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql'});useServer( { schema: server.schema, onConnect: () => { connectionMetrics.totalConnections++; connectionMetrics.activeConnections++; console.log('Connection metrics:', connectionMetrics); }, onDisconnect: () => { connectionMetrics.activeConnections--; connectionMetrics.disconnections++; console.log('Connection metrics:', connectionMetrics); } }, wsServer);订阅指标const subscriptionMetrics = new Map();function trackSubscription(eventName) { if (!subscriptionMetrics.has(eventName)) { subscriptionMetrics.set(eventName, { count: 0, lastPublished: null }); } const metrics = subscriptionMetrics.get(eventName); metrics.count++; metrics.lastPublished = new Date();}// 在发布事件时追踪pubsub.publish(POST_CREATED, { postCreated: post });trackSubscription(POST_CREATED);9. 订阅最佳实践| 实践 | 说明 ||------|------|| 使用 Redis PubSub | 支持分布式部署 || 实现连接认证 | 确保订阅安全 || 添加错误处理 | 提高稳定性 || 实现过滤机制 | 减少不必要的数据推送 || 监控连接状态 | 及时发现问题 || 使用批量发布 | 提高性能 || 实现重连机制 | 提高可靠性 || 限制订阅数量 | 防止资源耗尽 || 设置超时时间 | 避免僵尸连接 || 记录订阅日志 | 便于调试和分析 |10. 常见问题及解决方案| 问题 | 原因 | 解决方案 ||------|------|----------|| 连接频繁断开 | 网络不稳定、超时 | 实现自动重连、增加超时时间 || 订阅延迟高 | 服务器负载高、处理慢 | 优化性能、使用批量发布 || 内存泄漏 | 未正确清理订阅 | 确保取消订阅、清理资源 || 数据不一致 | 缓存未更新 | 实现缓存失效机制 || 安全问题 | 未验证连接 | 实现连接认证和授权 |