乐闻世界logo
搜索文章和话题

How to implement GraphQL Subscriptions

2月21日 17:00

GraphQL Subscriptions Implementation Explained

GraphQL subscriptions allow clients to receive real-time data updates pushed by the server, which is a key feature for building real-time applications. Here are detailed implementation solutions for GraphQL subscriptions.

1. Basic Subscription Concepts

How Subscriptions Work

  1. Client establishes persistent connection via WebSocket
  2. Client sends subscription query
  3. Server maintains connection and listens for events
  4. When event occurs, server pushes data to client
  5. Client receives and processes updates

Subscriptions vs Polling

FeatureSubscriptionsPolling
Real-timeHighLow
Server loadLow (event-driven)High (continuous queries)
Network overheadLow (push on demand)High (periodic requests)
Implementation complexityHighLow
Use caseReal-time updatesPeriodic checks

2. Server-side Implementation

Using graphql-subscriptions

javascript
const { PubSub } = require('graphql-subscriptions'); const pubsub = new PubSub(); const POST_CREATED = 'POST_CREATED'; const POST_UPDATED = 'POST_UPDATED'; const COMMENT_ADDED = 'COMMENT_ADDED'; const typeDefs = ` type Post { id: ID! title: String! content: String! author: User! createdAt: DateTime! } type Comment { id: ID! text: String! author: User! post: Post! createdAt: DateTime! } type Subscription { postCreated: Post! postUpdated(postId: ID!): Post! commentAdded(postId: ID!): Comment! } `; const resolvers = { Subscription: { postCreated: { subscribe: () => pubsub.asyncIterator([POST_CREATED]) }, postUpdated: { subscribe: (_, { postId }) => { const asyncIterator = pubsub.asyncIterator([POST_UPDATED]); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { // Filter: only return updates for specified post if (event.postUpdated.id === postId) { yield event; } } })(); } }; } }, commentAdded: { subscribe: (_, { postId }) => { const asyncIterator = pubsub.asyncIterator([COMMENT_ADDED]); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { if (event.commentAdded.postId === postId) { yield event; } } })(); } }; } } }, Mutation: { createPost: async (_, { input }) => { const post = await Post.create(input); pubsub.publish(POST_CREATED, { postCreated: post }); return post; }, updatePost: async (_, { id, input }) => { const post = await Post.update(id, input); pubsub.publish(POST_UPDATED, { postUpdated: post }); return post; }, addComment: async (_, { input }) => { const comment = await Comment.create(input); pubsub.publish(COMMENT_ADDED, { commentAdded: comment }); return comment; } } };

Using Redis PubSub

javascript
const { RedisPubSub } = require('graphql-redis-subscriptions'); const pubsub = new RedisPubSub({ connection: { host: 'localhost', port: 6379, retry_strategy: (options) => { if (options.error && options.error.code === 'ECONNREFUSED') { return new Error('Redis connection refused'); } if (options.total_retry_time > 1000 * 60 * 60) { return new Error('Redis retry time exhausted'); } if (options.attempt > 10) { return undefined; } return Math.min(options.attempt * 100, 3000); } } }); // Usage same as in-memory PubSub const resolvers = { Subscription: { postCreated: { subscribe: () => pubsub.asyncIterator([POST_CREATED]) } } };

3. Apollo Server Subscription Implementation

Configure Apollo Server

javascript
const { ApolloServer } = require('apollo-server-express'); const { createServer } = require('http'); const { WebSocketServer } = require('ws'); const { useServer } = require('graphql-ws/lib/use/ws'); const server = new ApolloServer({ typeDefs, resolvers, context: ({ req, connection }) => { // HTTP request context if (req) { return { token: req.headers.authorization }; } // WebSocket connection context if (connection) { return { token: connection.context.authorization }; } } }); const httpServer = createServer(server); // Create WebSocket server const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql' }); useServer( { schema: server.schema, context: (ctx) => { // Validate connection const token = ctx.connectionParams?.authorization; if (!token) { throw new Error('Unauthorized'); } return { token }; }, onConnect: (ctx) => { console.log('Client connected'); return { authorization: ctx.connectionParams?.authorization }; }, onDisconnect: (ctx, code, reason) => { console.log('Client disconnected', { code, reason }); } }, wsServer ); server.listen().then(({ url }) => { console.log(`Server ready at ${url}`); });

4. Client-side Implementation

Apollo Client Subscriptions

javascript
import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client'; import { WebSocketLink } from '@apollo/client/link/ws'; import { getMainDefinition } from '@apollo/client/utilities'; // HTTP link const httpLink = new HttpLink({ uri: 'http://localhost:4000/graphql' }); // WebSocket link const wsLink = new WebSocketLink({ uri: 'ws://localhost:4000/graphql', options: { reconnect: true, connectionParams: { authToken: localStorage.getItem('token') }, lazy: true, connectionCallback: (error) => { if (error) { console.error('WebSocket connection error:', error); } else { console.log('WebSocket connected'); } } } }); // Split link const splitLink = split( ({ query }) => { const definition = getMainDefinition(query); return ( definition.kind === 'OperationDefinition' && definition.operation === 'subscription' ); }, wsLink, httpLink ); const client = new ApolloClient({ link: splitLink, cache: new InMemoryCache() }); // Use subscription import { gql, useSubscription } from '@apollo/client'; const POST_CREATED_SUBSCRIPTION = gql` subscription OnPostCreated { postCreated { id title content author { name } createdAt } } `; function PostList() { const { data, loading, error } = useSubscription(POST_CREATED_SUBSCRIPTION); if (loading) return <div>Loading...</div>; if (error) return <div>Error: {error.message}</div>; return ( <div> <h3>New Post Created:</h3> <p>{data.postCreated.title}</p> </div> ); }

React Hooks Subscriptions

javascript
import { useSubscription, useMutation } from '@apollo/client'; function ChatRoom({ roomId }) { const MESSAGE_ADDED = gql` subscription OnMessageAdded($roomId: ID!) { messageAdded(roomId: $roomId) { id text author { name } createdAt } } `; const SEND_MESSAGE = gql` mutation SendMessage($roomId: ID!, $text: String!) { sendMessage(roomId: $roomId, text: $text) { id text } } `; const { data: messageData, loading } = useSubscription(MESSAGE_ADDED, { variables: { roomId } }); const [sendMessage] = useMutation(SEND_MESSAGE); const handleSendMessage = (text) => { sendMessage({ variables: { roomId, text } }); }; return ( <div> {loading ? ( <div>Connecting...</div> ) : ( <div> <MessageList messages={messageData?.messageAdded} /> <MessageInput onSend={handleSendMessage} /> </div> )} </div> ); }

5. Subscription Filtering

Parameter-based Filtering

javascript
const resolvers = { Subscription: { notification: { subscribe: (_, { userId, types }) => { const asyncIterator = pubsub.asyncIterator(['NOTIFICATION']); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { const notification = event.notification; // Filter user if (userId && notification.userId !== userId) { continue; } // Filter type if (types && !types.includes(notification.type)) { continue; } yield event; } })(); } }; } } } };

Permission-based Filtering

javascript
const resolvers = { Subscription: { userUpdate: { subscribe: async (_, __, context) => { // Verify user permission if (!context.user) { throw new Error('Unauthorized'); } const asyncIterator = pubsub.asyncIterator(['USER_UPDATE']); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { const update = event.userUpdate; // Only return updates for current user if (update.userId !== context.user.id) { continue; } // Only return fields user has permission to view const filteredUpdate = filterSensitiveFields(update, context.user.role); yield { userUpdate: filteredUpdate }; } })(); } }; } } } };

6. Subscription Error Handling

Connection Error Handling

javascript
const wsLink = new WebSocketLink({ uri: 'ws://localhost:4000/graphql', options: { reconnect: true, retryAttempts: 5, connectionParams: async () => { const token = await getAuthToken(); return { token }; }, on: { connected: () => console.log('WebSocket connected'), error: (error) => { console.error('WebSocket error:', error); // Try to reconnect }, closed: (event) => { console.log('WebSocket closed:', event); // Clean up resources } } } });

Subscription Error Handling

javascript
function useSubscriptionWithErrorHandling(query, options) { const { data, error, loading } = useSubscription(query, options); useEffect(() => { if (error) { console.error('Subscription error:', error); // Handle based on error type if (error.networkError) { // Network error, try to reconnect handleNetworkError(error); } else if (error.graphQLErrors) { // GraphQL error handleGraphQLError(error); } } }, [error]); return { data, error, loading }; }

7. Subscription Performance Optimization

Batch Publishing

javascript
class BatchPublisher { constructor(pubsub, eventName, batchSize = 10, flushInterval = 100) { this.pubsub = pubsub; this.eventName = eventName; this.batchSize = batchSize; this.flushInterval = flushInterval; this.batch = []; this.flushTimer = null; } add(event) { this.batch.push(event); if (this.batch.length >= this.batchSize) { this.flush(); } else if (!this.flushTimer) { this.flushTimer = setTimeout(() => this.flush(), this.flushInterval); } } flush() { if (this.batch.length === 0) return; // Batch publish this.pubsub.publish(this.eventName, { batch: this.batch }); this.batch = []; if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } } } // Use batch publisher const batchPublisher = new BatchPublisher(pubsub, 'BATCH_EVENTS'); // Add events to batch batchPublisher.add({ type: 'event1', data: {} }); batchPublisher.add({ type: 'event2', data: {} });

Subscription Throttling

javascript
function useThrottledSubscription(query, options, throttleMs = 1000) { const { data, loading } = useSubscription(query, options); const [throttledData, setThrottledData] = useState(null); const lastUpdate = useRef(0); useEffect(() => { if (data) { const now = Date.now(); if (now - lastUpdate.current > throttleMs) { setThrottledData(data); lastUpdate.current = now; } } }, [data, throttleMs]); return { data: throttledData, loading }; }

8. Subscription Monitoring

Connection Monitoring

javascript
const connectionMetrics = { activeConnections: 0, totalConnections: 0, disconnections: 0 }; const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql' }); useServer( { schema: server.schema, onConnect: () => { connectionMetrics.totalConnections++; connectionMetrics.activeConnections++; console.log('Connection metrics:', connectionMetrics); }, onDisconnect: () => { connectionMetrics.activeConnections--; connectionMetrics.disconnections++; console.log('Connection metrics:', connectionMetrics); } }, wsServer );

Subscription Metrics

javascript
const subscriptionMetrics = new Map(); function trackSubscription(eventName) { if (!subscriptionMetrics.has(eventName)) { subscriptionMetrics.set(eventName, { count: 0, lastPublished: null }); } const metrics = subscriptionMetrics.get(eventName); metrics.count++; metrics.lastPublished = new Date(); } // Track when publishing events pubsub.publish(POST_CREATED, { postCreated: post }); trackSubscription(POST_CREATED);

9. Subscription Best Practices

PracticeDescription
Use Redis PubSubSupport distributed deployment
Implement connection authenticationEnsure subscription security
Add error handlingImprove stability
Implement filtering mechanismsReduce unnecessary data pushes
Monitor connection statusDetect issues in time
Use batch publishingImprove performance
Implement reconnection mechanismImprove reliability
Limit subscription countPrevent resource exhaustion
Set timeoutAvoid zombie connections
Log subscriptionsFacilitate debugging and analysis

10. Common Issues and Solutions

IssueCauseSolution
Frequent disconnectionsNetwork instability, timeoutImplement auto-reconnect, increase timeout
High subscription latencyHigh server load, slow processingOptimize performance, use batch publishing
Memory leakSubscriptions not cleaned up properlyEnsure unsubscribe, clean up resources
Data inconsistencyCache not updatedImplement cache invalidation
Security issuesConnection not verifiedImplement connection authentication and authorization
标签:GraphQL