乐闻世界logo
搜索文章和话题

How to handle Kafka message backlog?

2月21日 16:58

Kafka Message Backlog Handling

Kafka message backlog is a common problem in production environments, usually manifested as Consumer consumption speed not keeping up with Producer production speed, causing messages to accumulate in Brokers. Handling message backlog requires analysis and optimization from multiple dimensions.

Causes of Message Backlog

1. Insufficient Consumer Consumption Capacity

  • Single-threaded Consumption: Consumer uses single thread to process messages, slow processing speed
  • Complex Processing Logic: Message processing logic is complex and time-consuming
  • Slow External Dependencies: Slow response from external systems (databases, APIs, etc.)
  • Resource Limitations: Insufficient CPU, memory, network, and other resources

2. Excessive Producer Production Speed

  • Burst Traffic: Large amount of messages涌入 in a short time
  • Improper Production Configuration: Batch sending configuration causes instantaneous high traffic
  • Business Peak Period: Message volume surges during business peak periods

3. System Failures

  • Consumer Failure: Consumer crashes or network interruption
  • Dependent System Failures: Database, cache, and other dependent system failures
  • Network Issues: Network latency or insufficient bandwidth

Monitoring and Diagnosis

Monitoring Metrics

bash
# View Consumer Lag kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group my-group # View message backlog situation kafka-run-class kafka.tools.GetOffsetShell \ --broker-list localhost:9092 \ --topic my-topic --time -1

Key Metrics

  • Consumer Lag: Consumer consumption delay
  • Messages Per Second: Number of messages per second
  • Bytes Per Second: Number of bytes per second
  • Log Size: Log file size

Solutions

1. Increase Consumer Count

Principle: Increase consumption capacity by increasing the number of Consumer instances

Implementation Steps:

java
// Create multiple Consumer instances for (int i = 0; i < consumerCount; i++) { ConsumerThread thread = new ConsumerThread(properties); thread.start(); }

Considerations:

  • Consumer count cannot exceed Partition count
  • Each Consumer is assigned at least one Partition
  • Reasonably set the number of Consumers in the Consumer Group

2. Increase Partition Count

Principle: Support more Consumers for parallel consumption by increasing Partition count

Implementation Steps:

bash
# Increase Topic's Partition count kafka-topics --bootstrap-server localhost:9092 \ --alter --topic my-topic --partitions 20

Considerations:

  • Increasing Partitions does not redistribute existing messages
  • New messages are assigned to new Partitions
  • Need to restart Consumer to take effect

3. Optimize Consumption Logic

Batch Processing:

java
// Batch process messages ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); List<Record> batch = new ArrayList<>(); for (ConsumerRecord<String, String> record : records) { batch.add(record); if (batch.size() >= BATCH_SIZE) { processBatch(batch); batch.clear(); } }

Asynchronous Processing:

java
// Use thread pool for asynchronous processing ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); for (ConsumerRecord<String, String> record : records) { executor.submit(() -> processMessage(record)); }

Optimize External Dependencies:

  • Use connection pools
  • Add caching
  • Batch database operations
  • Asynchronously call external APIs

4. Adjust Consumer Configuration

properties
# Increase number of messages fetched per poll max.poll.records=1000 # Increase poll interval max.poll.interval.ms=300000 # Increase session timeout session.timeout.ms=30000 # Increase heartbeat interval heartbeat.interval.ms=3000

5. Temporary Scaling Solution

Create Temporary Topic:

bash
# Create temporary Topic kafka-topics --bootstrap-server localhost:9092 \ --create --topic my-topic-temp --partitions 50 \ --replication-factor 3

Migrate Messages:

bash
# Use MirrorMaker to migrate messages kafka-run-class kafka.tools.MirrorMaker \ --consumer.config consumer.properties \ --producer.config producer.properties \ --whitelist my-topic

Add Temporary Consumers:

  • Deploy a large number of temporary Consumers
  • Quickly consume backlog messages
  • Decommission temporary Consumers after consumption completes

6. Discard Non-critical Messages

Selective Consumption:

java
// Only consume latest messages consumer.seekToEnd(partitions); // Skip backlog messages long currentOffset = consumer.position(partition); long targetOffset = currentOffset - SKIP_COUNT; consumer.seek(partition, targetOffset);

Considerations:

  • Only applicable to non-critical business
  • Need to evaluate the impact of data loss
  • Recommend backing up backlog messages first

Preventive Measures

1. Capacity Planning

  • Evaluate business peak traffic
  • Reserve sufficient Consumer instances
  • Reasonably set Partition count

2. Monitoring and Alerting

  • Set Consumer Lag alert thresholds
  • Monitor message backlog trends
  • Timely discover and handle issues

3. Rate Limiting Strategy

java
// Producer-side rate limiting RateLimiter rateLimiter = RateLimiter.create(1000); // 1000 msg/s rateLimiter.acquire(); producer.send(record);

4. Degradation Strategy

  • Degrade non-core functions during peak periods
  • Reduce message processing complexity
  • Use simplified logic to process messages

Best Practices

  1. Reasonably Plan Resources

    • Evaluate required Consumer count based on QPS
    • Reserve 20-30% resource buffer
    • Consider traffic during business peak periods
  2. Optimize Consumption Logic

    • Simplify message processing logic
    • Use batch processing to improve efficiency
    • Reduce calls to external dependencies
  3. Establish Monitoring System

    • Real-time monitoring of Consumer Lag
    • Set up multi-level alert mechanisms
    • Regularly check system health status
  4. Develop Emergency Plans

    • Prepare temporary scaling solutions
    • Establish message backup mechanisms
    • Develop degradation strategies
  5. Regular Drills

    • Simulate message backlog scenarios
    • Verify scaling solutions
    • Test emergency plans

By comprehensively applying the above solutions, Kafka message backlog problems can be effectively resolved, ensuring system stability and reliability.

标签:Kafka