Kafka Message Duplicate Consumption and Solutions
In distributed systems, message duplicate consumption is a common problem. Although Kafka provides multiple mechanisms to prevent message loss, duplicate consumption can still occur in certain situations.
Causes of Message Duplication
1. Producer Side Duplication
- Network Fluctuation: Producer sends message but doesn't receive acknowledgment, retry causes duplicate sending
- Leader Switch: During Leader switch, Producer may send multiple times
- Idempotence Not Enabled: Producer idempotence not enabled, causing duplicate sending
2. Broker Side Duplication
- Replica Sync Issues: Replica sync delay causes duplicate consumption
- Offset Commit Failure: Consumer fails to commit Offset, causing duplicate consumption
- Rebalance: Consumer Group Rebalance causes duplicate consumption
3. Consumer Side Duplication
- Manual Commit Failure: Manual Offset commit fails, message is consumed again
- Processing Timeout: Message processing takes too long, triggers Rebalance causing duplicate consumption
- Abnormal Restart: Consumer restarts abnormally, consumes from last committed Offset
Solutions
1. Producer Side Idempotence
properties# Enable Producer idempotence enable.idempotence=true # Set retry count retries=3 # Set maximum in-flight requests max.in.flight.requests.per.connection=5
Principle: Kafka assigns a PID to each Producer and a sequence number to each message. The Broker determines if a message is duplicate through PID and sequence number.
2. Consumer Side Idempotence Processing
Database Unique Index
sql-- Create unique index to prevent duplicate insertion CREATE UNIQUE INDEX idx_unique_id ON messages (message_id);
Redis Deduplication
java// Use Redis Set to store processed message IDs String key = "processed_messages:" + topic; Boolean isNew = redisTemplate.opsForSet().add(key, messageId); if (isNew != null && isNew == 1) { // First time processing processMessage(message); }
State Machine Deduplication
java// Use state machine to record processing status enum MessageState { NEW, PROCESSING, PROCESSED, FAILED } // State transition: NEW -> PROCESSING -> PROCESSED // Avoid duplicate processing
3. Transactional Messages
java// Start transaction producer.beginTransaction(); try { // Send message producer.send(record); // Update database updateDatabase(data); // Commit transaction producer.commitTransaction(); } catch (Exception e) { // Abort transaction producer.abortTransaction(); }
4. Offset Commit Strategy
properties# Disable auto commit enable.auto.commit=false # Manual commit Offset consumer.commitSync(); # Async commit Offset consumer.commitAsync();
Best Practice: Manually commit Offset after message processing completes, ensuring atomicity between message processing and Offset commit.
Best Practices
-
Design Idempotent Interfaces
- Design all business interfaces as idempotent
- Use unique identifiers to distinguish duplicate requests
- Ensure consistent results for multiple executions
-
Reasonably Configure Parameters
- Enable Producer idempotence
- Disable Consumer auto commit
- Reasonably set timeout values
-
Monitor Duplicate Consumption
- Monitor message duplication rate
- Log duplicate consumption
- Timely discover and handle issues
-
Testing and Verification
- Simulate network failures
- Simulate Broker crashes
- Verify idempotence mechanisms
-
Business Layer Processing
- Implement idempotence logic at business layer
- Use database constraints to prevent duplication
- Record processing status to avoid duplication
Trade-off Between Performance and Reliability
- Idempotence processing increases system complexity
- Requires additional storage space to record processing status
- Slightly affects performance but improves reliability
- Must implement idempotence in critical business
By implementing corresponding idempotence mechanisms at the Producer, Broker, and Consumer levels, Kafka message duplicate consumption can be effectively avoided, ensuring system reliability and data consistency.