GraphQL Subscriptions Implementation Explained
GraphQL subscriptions allow clients to receive real-time data updates pushed by the server, which is a key feature for building real-time applications. Here are detailed implementation solutions for GraphQL subscriptions.
1. Basic Subscription Concepts
How Subscriptions Work
- Client establishes persistent connection via WebSocket
- Client sends subscription query
- Server maintains connection and listens for events
- When event occurs, server pushes data to client
- Client receives and processes updates
Subscriptions vs Polling
| Feature | Subscriptions | Polling |
|---|
| Real-time | High | Low |
| Server load | Low (event-driven) | High (continuous queries) |
| Network overhead | Low (push on demand) | High (periodic requests) |
| Implementation complexity | High | Low |
| Use case | Real-time updates | Periodic checks |
2. Server-side Implementation
Using graphql-subscriptions
const { PubSub } = require('graphql-subscriptions');
const pubsub = new PubSub();
const POST_CREATED = 'POST_CREATED';
const POST_UPDATED = 'POST_UPDATED';
const COMMENT_ADDED = 'COMMENT_ADDED';
const typeDefs = `
type Post {
id: ID!
title: String!
content: String!
author: User!
createdAt: DateTime!
}
type Comment {
id: ID!
text: String!
author: User!
post: Post!
createdAt: DateTime!
}
type Subscription {
postCreated: Post!
postUpdated(postId: ID!): Post!
commentAdded(postId: ID!): Comment!
}
`;
const resolvers = {
Subscription: {
postCreated: {
subscribe: () => pubsub.asyncIterator([POST_CREATED])
},
postUpdated: {
subscribe: (_, { postId }) => {
const asyncIterator = pubsub.asyncIterator([POST_UPDATED]);
return {
[Symbol.asyncIterator]() {
return (async function* () {
for await (const event of asyncIterator) {
// Filter: only return updates for specified post
if (event.postUpdated.id === postId) {
yield event;
}
}
})();
}
};
}
},
commentAdded: {
subscribe: (_, { postId }) => {
const asyncIterator = pubsub.asyncIterator([COMMENT_ADDED]);
return {
[Symbol.asyncIterator]() {
return (async function* () {
for await (const event of asyncIterator) {
if (event.commentAdded.postId === postId) {
yield event;
}
}
})();
}
};
}
}
},
Mutation: {
createPost: async (_, { input }) => {
const post = await Post.create(input);
pubsub.publish(POST_CREATED, { postCreated: post });
return post;
},
updatePost: async (_, { id, input }) => {
const post = await Post.update(id, input);
pubsub.publish(POST_UPDATED, { postUpdated: post });
return post;
},
addComment: async (_, { input }) => {
const comment = await Comment.create(input);
pubsub.publish(COMMENT_ADDED, { commentAdded: comment });
return comment;
}
}
};
Using Redis PubSub
const { RedisPubSub } = require('graphql-redis-subscriptions');
const pubsub = new RedisPubSub({
connection: {
host: 'localhost',
port: 6379,
retry_strategy: (options) => {
if (options.error && options.error.code === 'ECONNREFUSED') {
return new Error('Redis connection refused');
}
if (options.total_retry_time > 1000 * 60 * 60) {
return new Error('Redis retry time exhausted');
}
if (options.attempt > 10) {
return undefined;
}
return Math.min(options.attempt * 100, 3000);
}
}
});
// Usage same as in-memory PubSub
const resolvers = {
Subscription: {
postCreated: {
subscribe: () => pubsub.asyncIterator([POST_CREATED])
}
}
};
3. Apollo Server Subscription Implementation
const { ApolloServer } = require('apollo-server-express');
const { createServer } = require('http');
const { WebSocketServer } = require('ws');
const { useServer } = require('graphql-ws/lib/use/ws');
const server = new ApolloServer({
typeDefs,
resolvers,
context: ({ req, connection }) => {
// HTTP request context
if (req) {
return { token: req.headers.authorization };
}
// WebSocket connection context
if (connection) {
return { token: connection.context.authorization };
}
}
});
const httpServer = createServer(server);
// Create WebSocket server
const wsServer = new WebSocketServer({
server: httpServer,
path: '/graphql'
});
useServer(
{
schema: server.schema,
context: (ctx) => {
// Validate connection
const token = ctx.connectionParams?.authorization;
if (!token) {
throw new Error('Unauthorized');
}
return { token };
},
onConnect: (ctx) => {
console.log('Client connected');
return { authorization: ctx.connectionParams?.authorization };
},
onDisconnect: (ctx, code, reason) => {
console.log('Client disconnected', { code, reason });
}
},
wsServer
);
server.listen().then(({ url }) => {
console.log(`Server ready at ${url}`);
});
4. Client-side Implementation
Apollo Client Subscriptions
import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';
import { WebSocketLink } from '@apollo/client/link/ws';
import { getMainDefinition } from '@apollo/client/utilities';
// HTTP link
const httpLink = new HttpLink({
uri: 'http://localhost:4000/graphql'
});
// WebSocket link
const wsLink = new WebSocketLink({
uri: 'ws://localhost:4000/graphql',
options: {
reconnect: true,
connectionParams: {
authToken: localStorage.getItem('token')
},
lazy: true,
connectionCallback: (error) => {
if (error) {
console.error('WebSocket connection error:', error);
} else {
console.log('WebSocket connected');
}
}
}
});
// Split link
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === 'OperationDefinition' &&
definition.operation === 'subscription'
);
},
wsLink,
httpLink
);
const client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache()
});
// Use subscription
import { gql, useSubscription } from '@apollo/client';
const POST_CREATED_SUBSCRIPTION = gql`
subscription OnPostCreated {
postCreated {
id
title
content
author {
name
}
createdAt
}
}
`;
function PostList() {
const { data, loading, error } = useSubscription(POST_CREATED_SUBSCRIPTION);
if (loading) return <div>Loading...</div>;
if (error) return <div>Error: {error.message}</div>;
return (
<div>
<h3>New Post Created:</h3>
<p>{data.postCreated.title}</p>
</div>
);
}
React Hooks Subscriptions
import { useSubscription, useMutation } from '@apollo/client';
function ChatRoom({ roomId }) {
const MESSAGE_ADDED = gql`
subscription OnMessageAdded($roomId: ID!) {
messageAdded(roomId: $roomId) {
id
text
author {
name
}
createdAt
}
}
`;
const SEND_MESSAGE = gql`
mutation SendMessage($roomId: ID!, $text: String!) {
sendMessage(roomId: $roomId, text: $text) {
id
text
}
}
`;
const { data: messageData, loading } = useSubscription(MESSAGE_ADDED, {
variables: { roomId }
});
const [sendMessage] = useMutation(SEND_MESSAGE);
const handleSendMessage = (text) => {
sendMessage({ variables: { roomId, text } });
};
return (
<div>
{loading ? (
<div>Connecting...</div>
) : (
<div>
<MessageList messages={messageData?.messageAdded} />
<MessageInput onSend={handleSendMessage} />
</div>
)}
</div>
);
}
5. Subscription Filtering
Parameter-based Filtering
const resolvers = {
Subscription: {
notification: {
subscribe: (_, { userId, types }) => {
const asyncIterator = pubsub.asyncIterator(['NOTIFICATION']);
return {
[Symbol.asyncIterator]() {
return (async function* () {
for await (const event of asyncIterator) {
const notification = event.notification;
// Filter user
if (userId && notification.userId !== userId) {
continue;
}
// Filter type
if (types && !types.includes(notification.type)) {
continue;
}
yield event;
}
})();
}
};
}
}
}
};
Permission-based Filtering
const resolvers = {
Subscription: {
userUpdate: {
subscribe: async (_, __, context) => {
// Verify user permission
if (!context.user) {
throw new Error('Unauthorized');
}
const asyncIterator = pubsub.asyncIterator(['USER_UPDATE']);
return {
[Symbol.asyncIterator]() {
return (async function* () {
for await (const event of asyncIterator) {
const update = event.userUpdate;
// Only return updates for current user
if (update.userId !== context.user.id) {
continue;
}
// Only return fields user has permission to view
const filteredUpdate = filterSensitiveFields(update, context.user.role);
yield { userUpdate: filteredUpdate };
}
})();
}
};
}
}
}
};
6. Subscription Error Handling
Connection Error Handling
const wsLink = new WebSocketLink({
uri: 'ws://localhost:4000/graphql',
options: {
reconnect: true,
retryAttempts: 5,
connectionParams: async () => {
const token = await getAuthToken();
return { token };
},
on: {
connected: () => console.log('WebSocket connected'),
error: (error) => {
console.error('WebSocket error:', error);
// Try to reconnect
},
closed: (event) => {
console.log('WebSocket closed:', event);
// Clean up resources
}
}
}
});
Subscription Error Handling
function useSubscriptionWithErrorHandling(query, options) {
const { data, error, loading } = useSubscription(query, options);
useEffect(() => {
if (error) {
console.error('Subscription error:', error);
// Handle based on error type
if (error.networkError) {
// Network error, try to reconnect
handleNetworkError(error);
} else if (error.graphQLErrors) {
// GraphQL error
handleGraphQLError(error);
}
}
}, [error]);
return { data, error, loading };
}
Batch Publishing
class BatchPublisher {
constructor(pubsub, eventName, batchSize = 10, flushInterval = 100) {
this.pubsub = pubsub;
this.eventName = eventName;
this.batchSize = batchSize;
this.flushInterval = flushInterval;
this.batch = [];
this.flushTimer = null;
}
add(event) {
this.batch.push(event);
if (this.batch.length >= this.batchSize) {
this.flush();
} else if (!this.flushTimer) {
this.flushTimer = setTimeout(() => this.flush(), this.flushInterval);
}
}
flush() {
if (this.batch.length === 0) return;
// Batch publish
this.pubsub.publish(this.eventName, { batch: this.batch });
this.batch = [];
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
}
}
// Use batch publisher
const batchPublisher = new BatchPublisher(pubsub, 'BATCH_EVENTS');
// Add events to batch
batchPublisher.add({ type: 'event1', data: {} });
batchPublisher.add({ type: 'event2', data: {} });
Subscription Throttling
function useThrottledSubscription(query, options, throttleMs = 1000) {
const { data, loading } = useSubscription(query, options);
const [throttledData, setThrottledData] = useState(null);
const lastUpdate = useRef(0);
useEffect(() => {
if (data) {
const now = Date.now();
if (now - lastUpdate.current > throttleMs) {
setThrottledData(data);
lastUpdate.current = now;
}
}
}, [data, throttleMs]);
return { data: throttledData, loading };
}
8. Subscription Monitoring
Connection Monitoring
const connectionMetrics = {
activeConnections: 0,
totalConnections: 0,
disconnections: 0
};
const wsServer = new WebSocketServer({
server: httpServer,
path: '/graphql'
});
useServer(
{
schema: server.schema,
onConnect: () => {
connectionMetrics.totalConnections++;
connectionMetrics.activeConnections++;
console.log('Connection metrics:', connectionMetrics);
},
onDisconnect: () => {
connectionMetrics.activeConnections--;
connectionMetrics.disconnections++;
console.log('Connection metrics:', connectionMetrics);
}
},
wsServer
);
Subscription Metrics
const subscriptionMetrics = new Map();
function trackSubscription(eventName) {
if (!subscriptionMetrics.has(eventName)) {
subscriptionMetrics.set(eventName, {
count: 0,
lastPublished: null
});
}
const metrics = subscriptionMetrics.get(eventName);
metrics.count++;
metrics.lastPublished = new Date();
}
// Track when publishing events
pubsub.publish(POST_CREATED, { postCreated: post });
trackSubscription(POST_CREATED);
9. Subscription Best Practices
| Practice | Description |
|---|
| Use Redis PubSub | Support distributed deployment |
| Implement connection authentication | Ensure subscription security |
| Add error handling | Improve stability |
| Implement filtering mechanisms | Reduce unnecessary data pushes |
| Monitor connection status | Detect issues in time |
| Use batch publishing | Improve performance |
| Implement reconnection mechanism | Improve reliability |
| Limit subscription count | Prevent resource exhaustion |
| Set timeout | Avoid zombie connections |
| Log subscriptions | Facilitate debugging and analysis |
10. Common Issues and Solutions
| Issue | Cause | Solution |
|---|
| Frequent disconnections | Network instability, timeout | Implement auto-reconnect, increase timeout |
| High subscription latency | High server load, slow processing | Optimize performance, use batch publishing |
| Memory leak | Subscriptions not cleaned up properly | Ensure unsubscribe, clean up resources |
| Data inconsistency | Cache not updated | Implement cache invalidation |
| Security issues | Connection not verified | Implement connection authentication and authorization |