乐闻世界logo
搜索文章和话题

服务端面试题手册

Kafka 如何保证消息的顺序性?

Kafka 消息顺序性保证Kafka 在 Partition 级别保证消息的顺序性,这是 Kafka 设计的一个重要特性。分区内有序性保证机制:Kafka 保证同一个 Partition 内的消息按照发送顺序被消费实现原理:每个 Partition 内部维护一个有序的消息队列,消息按照追加顺序写入消费顺序:Consumer 从 Partition 读取消息时,严格按照写入顺序消费跨分区无序性Topic 级别:如果 Topic 有多个 Partition,则无法保证 Topic 级别的消息顺序原因:不同 Partition 之间的消息是并行处理的,无法保证全局顺序影响:相关消息可能被分配到不同 Partition,导致消费顺序不一致保证顺序性的方法单分区策略将需要保证顺序的消息发送到同一个 Partition使用相同的 Key,Kafka 会根据 Key 进行 Hash 分配到同一 Partition适用于顺序性要求高的场景自定义分区器实现 Partitioner 接口根据业务逻辑自定义分区规则确保相关消息路由到同一 Partition单 Consumer 消费在 Consumer Group 中只有一个 Consumer 消费该 Topic避免多 Consumer 并行消费导致乱序会降低消费性能实践建议对于需要严格顺序的场景,使用单 Partition对于可以容忍部分乱序的场景,使用多 Partition 提高性能合理设置消息 Key,确保相关消息在同一 Partition监控 Consumer 的消费进度,避免消息积压性能与顺序的权衡单分区保证顺序但性能受限多分区提高性能但牺牲顺序性需要根据业务需求在两者之间找到平衡点在实际应用中,大多数场景不需要全局顺序,只需要保证相关消息的顺序即可,此时通过合理的 Key 设计和分区策略可以在性能和顺序性之间取得良好平衡。
阅读 0·2月21日 16:58

Kafka 如何解决消息重复消费的问题?

Kafka 消息重复消费及解决方案在分布式系统中,消息重复消费是一个常见问题。Kafka 虽然提供了多种机制来避免消息丢失,但在某些情况下仍可能出现重复消费的情况。消息重复的原因1. Producer 端重复网络抖动:Producer 发送消息后未收到确认,重试导致重复发送Leader 切换:Leader 切换过程中,Producer 可能发送多次幂等性未开启:未启用 Producer 幂等性,导致重复发送2. Broker 端重复副本同步问题:副本同步延迟导致重复消费Offset 提交失败:Consumer 提交 Offset 失败,导致重复消费Rebalance:Consumer Group Rebalance 导致重复消费3. Consumer 端重复手动提交失败:手动提交 Offset 失败,消息被重复消费处理超时:消息处理时间过长,触发 Rebalance 导致重复消费异常重启:Consumer 异常重启,从上次提交的 Offset 开始消费解决方案1. Producer 端幂等性# 开启 Producer 幂等性enable.idempotence=true# 设置重试次数retries=3# 设置最大未确认请求max.in.flight.requests.per.connection=5原理:Kafka 为每个 Producer 分配一个 PID,并为每条消息分配序列号,Broker 端通过 PID 和序列号判断消息是否重复。2. Consumer 端幂等性处理数据库唯一索引-- 创建唯一索引防止重复插入CREATE UNIQUE INDEX idx_unique_id ON messages (message_id);Redis 去重// 使用 Redis Set 存储已处理的消息 IDString key = "processed_messages:" + topic;Boolean isNew = redisTemplate.opsForSet().add(key, messageId);if (isNew != null && isNew == 1) { // 首次处理 processMessage(message);}状态机去重// 使用状态机记录处理状态enum MessageState { NEW, PROCESSING, PROCESSED, FAILED}// 状态转换:NEW -> PROCESSING -> PROCESSED// 避免重复处理3. 事务消息// 开启事务producer.beginTransaction();try { // 发送消息 producer.send(record); // 更新数据库 updateDatabase(data); // 提交事务 producer.commitTransaction();} catch (Exception e) { // 回滚事务 producer.abortTransaction();}4. Offset 提交策略# 禁用自动提交enable.auto.commit=false# 手动提交 Offsetconsumer.commitSync();# 异步提交 Offsetconsumer.commitAsync();最佳实践:在消息处理完成后手动提交 Offset,确保消息处理和 Offset 提交的原子性。最佳实践设计幂等接口所有业务接口设计为幂等使用唯一标识符区分重复请求确保多次执行结果一致合理配置参数开启 Producer 幂等性禁用 Consumer 自动提交合理设置超时时间监控重复消费监控消息重复率记录重复消费日志及时发现和处理问题测试验证模拟网络故障模拟 Broker 宕机验证幂等性机制业务层处理在业务层实现幂等逻辑使用数据库约束防止重复记录处理状态避免重复性能与可靠性的权衡幂等性处理会增加系统复杂度需要额外的存储空间记录处理状态会轻微影响性能,但提高了可靠性在关键业务中必须实现幂等性通过在 Producer、Broker 和 Consumer 端都实现相应的幂等性机制,可以有效避免 Kafka 消息重复消费的问题,确保系统的可靠性和数据一致性。
阅读 0·2月21日 16:58

Kafka 消息积压如何处理?

Kafka 消息积压处理Kafka 消息积压是生产环境中常见的问题,通常表现为 Consumer 消费速度跟不上 Producer 生产速度,导致消息在 Broker 中堆积。处理消息积压需要从多个维度进行分析和优化。消息积压的原因1. Consumer 消费能力不足单线程消费:Consumer 使用单线程处理消息,处理速度慢处理逻辑复杂:消息处理逻辑复杂,耗时较长外部依赖慢:依赖外部系统(数据库、API 等)响应慢资源限制:CPU、内存、网络等资源不足2. Producer 生产速度过快突发流量:短时间内大量消息涌入生产配置不当:批量发送配置导致瞬时流量大业务高峰期:业务高峰期消息量激增3. 系统故障Consumer 故障:Consumer 宕机或网络中断依赖系统故障:数据库、缓存等依赖系统故障网络问题:网络延迟或带宽不足监控和诊断监控指标# 查看 Consumer Lagkafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group my-group# 查看消息积压情况kafka-run-class kafka.tools.GetOffsetShell \ --broker-list localhost:9092 \ --topic my-topic --time -1关键指标Consumer Lag:Consumer 消费延迟Messages Per Second:每秒消息数量Bytes Per Second:每秒字节数Log Size:日志文件大小解决方案1. 增加 Consumer 数量原理:通过增加 Consumer 实例数量提高消费能力实施步骤:// 创建多个 Consumer 实例for (int i = 0; i < consumerCount; i++) { ConsumerThread thread = new ConsumerThread(properties); thread.start();}注意事项:Consumer 数量不能超过 Partition 数量每个 Consumer 至少分配一个 Partition合理设置 Consumer Group 的 Consumer 数量2. 增加 Partition 数量原理:通过增加 Partition 数量支持更多 Consumer 并行消费实施步骤:# 增加 Topic 的 Partition 数量kafka-topics --bootstrap-server localhost:9092 \ --alter --topic my-topic --partitions 20注意事项:增加 Partition 不会重新分配现有消息新消息会分配到新的 Partition需要重启 Consumer 才能生效3. 优化消费逻辑批量处理:// 批量处理消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));List<Record> batch = new ArrayList<>();for (ConsumerRecord<String, String> record : records) { batch.add(record); if (batch.size() >= BATCH_SIZE) { processBatch(batch); batch.clear(); }}异步处理:// 使用线程池异步处理ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);for (ConsumerRecord<String, String> record : records) { executor.submit(() -> processMessage(record));}优化外部依赖:使用连接池增加缓存批量操作数据库异步调用外部 API4. 调整 Consumer 配置# 增加每次拉取的消息数max.poll.records=1000# 增加拉取间隔max.poll.interval.ms=300000# 增加会话超时时间session.timeout.ms=30000# 增加心跳间隔heartbeat.interval.ms=30005. 临时扩容方案创建临时 Topic:# 创建临时 Topickafka-topics --bootstrap-server localhost:9092 \ --create --topic my-topic-temp --partitions 50 \ --replication-factor 3迁移消息:# 使用 MirrorMaker 迁移消息kafka-run-class kafka.tools.MirrorMaker \ --consumer.config consumer.properties \ --producer.config producer.properties \ --whitelist my-topic增加临时 Consumer:部署大量临时 Consumer快速消费积压消息消费完成后下线临时 Consumer6. 丢弃非关键消息选择性消费:// 只消费最新的消息consumer.seekToEnd(partitions);// 跳过积压的消息long currentOffset = consumer.position(partition);long targetOffset = currentOffset - SKIP_COUNT;consumer.seek(partition, targetOffset);注意事项:仅适用于非关键业务需要评估数据丢失的影响建议先备份积压的消息预防措施1. 容量规划评估业务峰值流量预留足够的 Consumer 实例合理设置 Partition 数量2. 监控告警设置 Consumer Lag 告警阈值监控消息积压趋势及时发现和处理问题3. 限流策略// Producer 端限流RateLimiter rateLimiter = RateLimiter.create(1000); // 1000 msg/srateLimiter.acquire();producer.send(record);4. 降级策略在高峰期降级非核心功能减少消息处理复杂度使用简化逻辑处理消息最佳实践合理规划资源根据 QPS 评估所需 Consumer 数量预留 20-30% 的资源缓冲考虑业务高峰期的流量优化消费逻辑简化消息处理逻辑使用批量处理提高效率减少外部依赖的调用建立监控体系实时监控 Consumer Lag设置多级告警机制定期检查系统健康状态制定应急预案准备临时扩容方案建立消息备份机制制定降级策略定期演练模拟消息积压场景验证扩容方案测试应急预案通过综合运用以上方案,可以有效解决 Kafka 消息积压问题,确保系统的稳定性和可靠性。
阅读 0·2月21日 16:58

Kafka 的副本机制是如何工作的?

Kafka 副本机制Kafka 的副本机制是其高可用性和容错性的核心。通过副本机制,Kafka 可以在节点故障时保证数据不丢失,并持续提供服务。副本基本概念副本角色Leader 副本负责处理所有的读写请求每个 Partition 只有一个 LeaderLeader 所在的 Broker 处理所有 Producer 和 Consumer 请求Follower 副本从 Leader 同步数据不处理客户端请求可以成为新的 LeaderISR(In-Sync Replicas)与 Leader 保持同步的副本集合ISR 中的副本数据与 Leader 完全一致只有 ISR 中的副本才有资格被选为新的 Leader副本同步机制同步过程Producer 发送消息Producer 将消息发送到 LeaderLeader 将消息写入本地日志Leader 同步到 FollowerLeader 将消息发送给 ISR 中的所有 FollowerFollower 接收消息并写入本地日志Follower 向 Leader 发送确认确认机制Leader 收到 ISR 中所有 Follower 的确认后,向 Producer 返回成功根据 acks 参数决定等待确认的数量同步配置# 副本因子default.replication.factor=3# 最小同步副本数min.insync.replicas=2# 副本最大延迟时间replica.lag.time.max.ms=30000# 副本最大延迟消息数replica.lag.max.messages=4000Leader 选举机制选举触发条件Leader 故障Leader 所在 Broker 宕机Leader 网络分区Controller 故障Controller 负责管理集群状态Controller 故障时重新选举选举过程检测故障ZooKeeper 检测到 Leader 失效Controller 收到故障通知选择新 Leader从 ISR 中选择 AR(Assigned Replicas)中排名靠前的副本优先选择在 ISR 中的副本如果 ISR 为空,从 AR 中选择更新元数据Controller 更新 ZooKeeper 中的元数据通知所有 Broker 新的 Leader 信息选举策略AR(Assigned Replicas):分配的所有副本ISR(In-Sync Replicas):与 Leader 同步的副本OSR(Out-of-Sync Replicas):未与 Leader 同步的副本副本管理副本分配# 自动创建 Topic 的副本因子default.replication.factor=3# Topic 级别副本因子replication.factor=3分配原则:副本均匀分布在不同的 Broker 上同一个 Partition 的副本不在同一个 Broker考虑机架感知,副本分布在不同机架副本下线优雅下线使用 kafka-reassign-partitions 工具先迁移 Leader,再下线副本保证数据不丢失故障下线自动触发 Leader 选举从 ISR 中选择新 Leader重建副本保证副本数容错机制故障场景处理Follower 故障Follower 从 ISR 中移除Leader 继续服务Follower 恢复后重新加入 ISRLeader 故障触发 Leader 选举从 ISR 中选择新 Leader保证数据一致性多个副本故障如果 ISR 中副本数 >= min.insync.replicas,继续服务如果 ISR 中副本数 < min.insync.replicas,拒绝写入性能优化副本数选择副本数 = 1:无容错,性能最好副本数 = 2:单点容错,性能较好副本数 = 3:推荐配置,平衡性能和可靠性副本数 > 3:高可靠性,但性能下降同步优化# 减少同步延迟replica.lag.time.max.ms=10000# 优化网络配置socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400# 优化 I/O 配置num.io.threads=16监控指标副本同步指标UnderReplicatedPartitions:未完全同步的分区数IsrShrinksPerSec:ISR 缩减速率IsrExpandsPerSec:ISR 扩张速率OfflineReplicasCount:离线副本数Leader 选举指标LeaderElectionRate:Leader 选举速率ActiveControllerCount:活跃 Controller 数量最佳实践合理设置副本数生产环境建议至少 3 个副本根据业务重要性调整副本数考虑存储成本和性能影响监控副本状态定期检查 ISR 状态监控副本同步延迟及时处理副本异常规划 Broker 分布副本分布在不同物理机考虑机架和机房分布避免单点故障定期测试模拟 Broker 故障验证容错机制测试恢复时间备份策略定期备份 Kafka 数据建立灾难恢复方案测试备份恢复流程通过合理配置和管理 Kafka 副本机制,可以在保证数据可靠性的同时提供良好的性能表现。
阅读 0·2月21日 16:58

Kafka 事务消息是如何工作的?

Kafka 事务消息Kafka 事务消息是 Kafka 0.11 版本引入的重要特性,它允许 Producer 将多条消息发送到多个 Topic 和 Partition,并保证这些消息要么全部成功,要么全部失败。这对于需要保证数据一致性的场景非常重要。事务消息的基本概念1. 事务定义Kafka 事务是指一组消息的原子性操作,这组消息要么全部成功提交,要么全部回滚。特点:原子性:事务中的所有消息要么全部成功,要么全部失败一致性:事务执行后,系统状态保持一致隔离性:事务执行期间,其他事务不会看到中间状态持久性:事务提交后,结果永久保存2. 事务 ID每个 Producer 需要配置一个唯一的事务 ID(transactional.id)。作用:标识 Producer 的事务身份用于故障恢复和幂等性保证确保 Producer 重启后能够恢复未完成的事务配置:# 事务 IDtransactional.id=my-transactional-id-1事务消息的工作原理1. 事务初始化Producer 启动时,会向 Coordinator 注册事务 ID。过程:Producer 向 Coordinator 发送注册请求Coordinator 记录事务 ID 和 Producer 的映射关系Coordinator 为 Producer 分配一个 PID(Producer ID)2. 事务开始Producer 调用 beginTransaction() 开始一个新事务。过程:Producer 向 Coordinator 请求开始事务Coordinator 记录事务开始时间Producer 开始收集消息3. 发送消息Producer 在事务中发送消息到多个 Topic 和 Partition。过程:Producer 将消息发送到 BrokerBroker 将消息写入日志,但不标记为可消费Broker 记录消息属于当前事务4. 事务提交或回滚Producer 调用 commitTransaction() 或 abortTransaction()。提交过程:Producer 向 Coordinator 发送提交请求Coordinator 向所有相关 Broker 发送提交标记Broker 将事务中的消息标记为可消费Coordinator 记录事务完成回滚过程:Producer 向 Coordinator 发送回滚请求Coordinator 向所有相关 Broker 发送回滚标记Broker 删除事务中的消息Coordinator 记录事务回滚事务消息的配置Producer 配置# 启用事务支持enable.idempotence=true# 事务 IDtransactional.id=my-transactional-id-1# 事务超时时间transaction.timeout.ms=60000# 重试次数retries=Integer.MAX_VALUE# 最大未确认请求max.in.flight.requests.per.connection=5Broker 配置# 事务状态日志副本数transaction.state.log.replication.factor=3# 事务状态日志最小同步副本数transaction.state.log.min.isr=2# 事务状态日志段大小transaction.state.log.segment.bytes=104857600# 事务超时时间transactional.id.expiration.ms=604800000事务消息的使用基本使用示例// 创建 ProducerProperties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("transactional.id", "my-transactional-id-1");props.put("enable.idempotence", "true");KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 初始化事务producer.initTransactions();try { // 开始事务 producer.beginTransaction(); // 发送消息到 Topic1 producer.send(new ProducerRecord<>("topic1", "key1", "value1")); // 发送消息到 Topic2 producer.send(new ProducerRecord<>("topic2", "key2", "value2")); // 提交事务 producer.commitTransaction();} catch (Exception e) { // 回滚事务 producer.abortTransaction();}与数据库事务集成// 创建 ProducerKafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.initTransactions();// 获取数据库连接Connection conn = dataSource.getConnection();try { // 开始 Kafka 事务 producer.beginTransaction(); // 开始数据库事务 conn.setAutoCommit(false); // 发送 Kafka 消息 producer.send(new ProducerRecord<>("topic1", "key1", "value1")); // 执行数据库操作 Statement stmt = conn.createStatement(); stmt.executeUpdate("INSERT INTO table1 VALUES (1, 'data')"); // 提交数据库事务 conn.commit(); // 提交 Kafka 事务 producer.commitTransaction();} catch (Exception e) { // 回滚数据库事务 conn.rollback(); // 回滚 Kafka 事务 producer.abortTransaction();} finally { conn.close();}事务消息的隔离级别Read Committed(默认)Consumer 只能读取已提交事务的消息。特点:保证数据一致性避免读取未提交的数据适用于大多数场景配置:# Consumer 配置isolation.level=read_committedRead UncommittedConsumer 可以读取所有消息,包括未提交事务的消息。特点:性能更好可能读取到未提交的数据适用于对一致性要求不高的场景配置:# Consumer 配置isolation.level=read_uncommitted事务消息的应用场景1. 数据一致性保证场景描述:需要保证多个系统之间的数据一致性。示例:订单系统和库存系统支付系统和账务系统用户中心和权限系统2. 幂等性保证场景描述:需要保证消息不重复处理。示例:支付通知订单状态更新库存扣减3. 事件溯源场景描述:需要记录所有状态变更事件。示例:账户交易记录订单状态流转系统操作日志事务消息的性能影响性能开销网络开销需要与 Coordinator 通信需要与多个 Broker 通信增加了网络往返次数存储开销需要存储事务状态需要存储事务日志增加了磁盘 I/O延迟开销需要等待事务提交需要等待所有 Broker 确认增加了端到端延迟性能优化批量提交在一个事务中发送多条消息减少事务提交次数提高吞吐量合理设置超时时间根据业务需求设置事务超时时间避免过长的事务超时时间平衡可靠性和性能优化网络配置增加 Broker 网络带宽减少 Coordinator 和 Broker 之间的网络延迟优化网络拓扑事务消息的监控监控指标TransactionStarted:事务开始次数TransactionCommitted:事务提交次数TransactionAborted:事务回滚次数TransactionTimeout:事务超时次数监控命令# 查看事务状态kafka-transactions --bootstrap-server localhost:9092 \ --describe --transactional-id my-transactional-id-1# 查看 Producer 事务状态kafka-producer-perf-test --topic test-topic \ --num-records 1000 --record-size 1000 \ --throughput 10000 --producer-props \ enable.idempotence=true \ transactional.id=my-transactional-id-1最佳实践1. 合理设计事务范围事务中包含的消息数量要适中避免长时间运行的事务根据业务需求设计事务边界2. 处理事务失败实现事务失败重试机制记录事务失败日志建立事务补偿机制3. 监控事务状态实时监控事务提交和回滚监控事务超时情况及时发现和处理异常4. 优化性能批量发送消息合理设置事务超时时间优化网络和存储配置通过合理使用 Kafka 事务消息,可以在分布式系统中实现强一致性保证,同时保持系统的高性能和可用性。
阅读 0·2月21日 16:58

Kafka 支持哪些压缩算法?如何选择?

Kafka 消息压缩Kafka 支持消息压缩功能,通过压缩消息可以显著减少网络传输带宽和磁盘存储空间,同时提高系统的整体吞吐量。理解 Kafka 的消息压缩机制对于性能优化和资源规划非常重要。压缩算法Kafka 支持多种压缩算法,每种算法都有其特点和适用场景。1. Gzip特点:压缩率高CPU 消耗较高压缩和解压速度较慢适用于文本数据适用场景:网络带宽有限存储成本高对延迟要求不高配置:compression.type=gzip2. Snappy特点:压缩率中等CPU 消耗低压缩和解压速度快平衡性能和压缩率适用场景:需要平衡性能和压缩率CPU 资源有限对延迟有一定要求配置:compression.type=snappy3. LZ4特点:压缩率较低CPU 消耗极低压缩和解压速度最快适用于对性能要求极高的场景适用场景:对性能要求极高CPU 资源紧张对压缩率要求不高配置:compression.type=lz44. Zstd特点:压缩率高(接近 Gzip)CPU 消耗中等压缩和解压速度较快Kafka 2.1.0+ 支持适用场景:需要高压缩率对性能有一定要求Kafka 版本较新配置:compression.type=zstd压缩级别部分压缩算法支持压缩级别配置,可以在压缩率和性能之间进行权衡。Gzip 压缩级别# 压缩级别:1-9,默认 6compression.level=6级别 1:压缩率最低,速度最快级别 6:平衡压缩率和速度(默认)级别 9:压缩率最高,速度最慢Zstd 压缩级别# 压缩级别:1-19,默认 3compression.level=3级别 1:压缩率最低,速度最快级别 3:平衡压缩率和速度(默认)级别 19:压缩率最高,速度最慢压缩配置Producer 配置# 压缩类型:none, gzip, snappy, lz4, zstdcompression.type=snappy# 压缩级别(部分算法支持)compression.level=6# 批量发送大小(影响压缩效果)batch.size=16384# 批量发送等待时间linger.ms=5Broker 配置# 是否启用压缩(Producer 覆盖此配置)compression.type=producer# 线程数配置num.network.threads=8num.io.threads=16压缩原理1. Producer 端压缩压缩时机:Producer 将消息收集到批量缓冲区当满足批量发送条件时,对整个批次进行压缩压缩后的批次发送到 Broker压缩单位:以批次为单位进行压缩批次越大,压缩效果越好批次越小,压缩效果越差2. Broker 端处理存储策略:Broker 接收压缩后的批次直接存储压缩后的数据不解压消息(除非需要)转发策略:Broker 将压缩后的批次转发给 FollowerFollower 存储压缩后的数据减少网络传输和磁盘 I/O3. Consumer 端解压解压时机:Consumer 拉取压缩后的批次Consumer 端解压批次中的消息解压后的消息传递给应用解压单位:以批次为单位进行解压批次越大,解压开销相对越小批次越小,解压开销相对越大压缩效果压缩率对比| 数据类型 | Gzip | Snappy | LZ4 | Zstd ||---------|------|--------|-----|------|| 文本数据 | 70-80% | 50-60% | 40-50% | 65-75% || JSON 数据 | 75-85% | 55-65% | 45-55% | 70-80% || 日志数据 | 65-75% | 45-55% | 35-45% | 60-70% || 二进制数据 | 30-40% | 20-30% | 15-25% | 25-35% |性能对比| 算法 | 压缩速度 | 解压速度 | CPU 消耗 ||------|---------|---------|---------|| Gzip | 慢 | 慢 | 高 || Snappy | 快 | 快 | 低 || LZ4 | 最快 | 最快 | 极低 || Zstd | 较快 | 较快 | 中等 |压缩优化建议1. 选择合适的压缩算法根据数据类型选择:文本数据:Gzip 或 Zstd日志数据:Snappy 或 Zstd二进制数据:LZ4 或不压缩JSON 数据:Gzip 或 Zstd根据性能要求选择:高性能要求:LZ4 或 Snappy平衡性能和压缩率:Snappy 或 Zstd高压缩率要求:Gzip 或 Zstd2. 优化批量配置# 增加批量大小以提高压缩效果batch.size=32768# 增加等待时间以收集更多消息linger.ms=10# 调整最大请求大小max.request.size=10485763. 监控压缩效果监控指标:Record-Compression-Rate:压缩速率Byte-In-Rate:接收字节速率Byte-Out-Rate:发送字节速率Compression-Ratio:压缩比监控命令:# 查看 Producer 指标kafka-producer-perf-test --topic test-topic \ --num-records 10000 --record-size 1000 \ --throughput 10000 --producer-props \ compression.type=snappy压缩注意事项1. CPU 消耗压缩会增加 CPU 消耗需要评估 CPU 资源是否充足监控 CPU 使用率2. 延迟影响压缩会增加端到端延迟批次越大,延迟越高需要平衡延迟和压缩效果3. 内存使用压缩需要额外的内存缓冲区批次越大,内存消耗越大需要合理配置内存大小4. 兼容性确保所有 Broker 支持选定的压缩算法确保 Consumer 支持解压选定的压缩算法注意 Kafka 版本兼容性最佳实践1. 压缩算法选择默认推荐:Snappy(平衡性能和压缩率)高压缩率场景:Gzip 或 Zstd高性能场景:LZ4新版本 Kafka:优先使用 Zstd2. 批量配置# 推荐配置batch.size=32768linger.ms=10compression.type=snappy3. 监控和调优持续监控压缩效果根据监控数据调整配置进行压力测试验证效果4. 测试验证在测试环境验证压缩效果测试不同压缩算法的性能验证压缩后的数据完整性压缩示例Producer 示例Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("compression.type", "snappy");props.put("batch.size", "32768");props.put("linger.ms", "10");KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 1000; i++) { producer.send(new ProducerRecord<>("test-topic", "key-" + i, "value-" + i));}性能测试# 测试不同压缩算法的性能kafka-producer-perf-test --topic test-topic \ --num-records 100000 --record-size 1024 \ --throughput 100000 --producer-props \ compression.type=gzipkafka-producer-perf-test --topic test-topic \ --num-records 100000 --record-size 1024 \ --throughput 100000 --producer-props \ compression.type=snappykafka-producer-perf-test --topic test-topic \ --num-records 100000 --record-size 1024 \ --throughput 100000 --producer-props \ compression.type=lz4通过合理配置和使用 Kafka 的消息压缩功能,可以在保证性能的同时显著减少网络传输和存储成本,提高系统的整体效率。
阅读 0·2月21日 16:58

Kafka 消息丢失的原因是什么?如何解决?

Kafka 消息丢失原因及解决方案Kafka 在设计上通过多种机制保证消息不丢失,但在实际应用中,消息丢失仍可能发生。了解这些原因和解决方案对于构建可靠的系统至关重要。消息丢失的常见原因1. Producer 端丢失网络问题:消息发送过程中网络中断异步发送:使用异步发送时,Producer 在消息发送前就返回重试机制未配置:发送失败后没有重试缓冲区溢出:消息积压导致缓冲区满,消息被丢弃2. Broker 端丢失未刷盘:消息写入内存但未刷到磁盘就宕机副本不足:副本数设置为 1,Broker 宕机导致消息丢失副本同步延迟:Leader 收到消息但未同步到 Follower 就宕机磁盘故障:物理磁盘损坏导致数据丢失3. Consumer 端丢失自动提交 Offset:消息消费后但在处理完成前提交了 Offset处理失败:消息处理失败但 Offset 已提交异常退出:Consumer 异常退出导致未提交的消息重新消费解决方案Producer 端配置# 设置重试次数retries=3# 设置确认级别acks=all # Leader 和所有 ISR 中的 Follower 都确认# 启用幂等性enable.idempotence=true# 设置缓冲区大小buffer.memory=33554432# 设置批量发送大小batch.size=16384Broker 端配置# 设置副本数default.replication.factor=3# 设置最小同步副本数min.insync.replicas=2# 设置刷盘策略log.flush.interval.messages=10000log.flush.interval.ms=1000# 启用副本失效检测replica.lag.time.max.ms=30000Consumer 端配置# 禁用自动提交enable.auto.commit=false# 手动提交 Offset# 在消息处理完成后提交consumer.commitSync()# 设置合理的超时时间session.timeout.ms=30000最佳实践合理设置 acks 参数acks=0:不等待确认,性能最高但可能丢失acks=1:等待 Leader 确认,平衡性能和可靠性acks=all:等待所有 ISR 副本确认,最可靠但性能较低使用事务开启 Producer 事务支持确保消息要么全部成功,要么全部失败监控和告警监控消息积压情况监控 Consumer Lag设置合理的告警机制定期备份定期备份 Kafka 数据建立灾难恢复方案测试验证进行故障模拟测试验证消息不丢失机制的有效性性能与可靠性的权衡高可靠性配置会降低性能需要根据业务场景选择合适的配置对于关键业务数据,优先保证可靠性对于非关键数据,可以适当牺牲可靠性换取性能通过合理配置和监控,可以在大多数场景下有效避免 Kafka 消息丢失问题。
阅读 0·2月21日 16:58

Nginx 如何配置日志?有哪些日志格式和优化方法?

Nginx 如何配置日志?有哪些日志格式和优化方法?Nginx 日志对于监控、调试和安全审计非常重要。合理配置日志可以帮助快速定位问题和优化性能。访问日志配置:http { # 自定义日志格式 log_format main '$remote_addr - $remote_user [$time_local] ' '"$request" $status $body_bytes_sent ' '"$http_referer" "$http_user_agent" ' '$request_time $upstream_response_time'; # 详细日志格式 log_format detailed '$remote_addr - $remote_user [$time_local] ' '"$request" $status $body_bytes_sent ' '"$http_referer" "$http_user_agent" ' '$request_time $upstream_response_time ' '$upstream_addr $upstream_status ' '$http_x_forwarded_for $request_id'; # JSON 格式日志 log_format json_combined escape=json '{' '"time_local":"$time_local",' '"remote_addr":"$remote_addr",' '"remote_user":"$remote_user",' '"request":"$request",' '"status":"$status",' '"body_bytes_sent":"$body_bytes_sent",' '"request_time":"$request_time",' '"http_referrer":"$http_referer",' '"http_user_agent":"$http_user_agent"' '}'; # 应用日志格式 access_log /var/log/nginx/access.log main; server { listen 80; server_name example.com; # 使用不同的日志格式 access_log /var/log/nginx/example.com.access.log detailed; location / { proxy_pass http://backend; } }}错误日志配置:# 错误日志级别:debug, info, notice, warn, error, crit, alert, emergerror_log /var/log/nginx/error.log warn;# 不同级别的错误日志error_log /var/log/nginx/error.log info;error_log /var/log/nginx/crit.log crit;日志优化:http { # 禁用特定路径的日志 location ~* \.(css|js|jpg|jpeg|png|gif|ico|svg|woff|woff2)$ { access_log off; } # 禁用健康检查日志 location /health { access_log off; return 200 "OK"; } # 缓冲日志写入 access_log /var/log/nginx/access.log main buffer=32k flush=5s; # 压缩日志 access_log /var/log/nginx/access.log main gzip=9;}条件日志记录:http { # 根据状态码记录日志 map $status $loggable { ~^[23] 0; default 1; } # 根据请求方法记录日志 map $request_method $loggable_method { GET 1; POST 1; default 0; } server { listen 80; server_name example.com; # 只记录特定状态码的请求 access_log /var/log/nginx/access.log main if=$loggable; # 只记录特定方法的请求 access_log /var/log/nginx/method.log main if=$loggable_method; location / { proxy_pass http://backend; } }}分离日志:http { # API 日志 log_format api '$remote_addr - $remote_user [$time_local] ' '"$request" $status $body_bytes_sent ' 'rt=$request_time uct="$upstream_connect_time" ' 'uht="$upstream_header_time" urt="$upstream_response_time"'; server { listen 80; server_name example.com; # 主站点日志 access_log /var/log/nginx/main.log main; # API 日志 location /api/ { access_log /var/log/nginx/api.log api; proxy_pass http://api_backend; } # 静态资源不记录 location ~* \.(css|js|jpg|jpeg|png|gif|ico|svg|woff|woff2)$ { access_log off; root /var/www/static; } }}日志轮转:# /etc/logrotate.d/nginx/var/log/nginx/*.log { daily missingok rotate 14 compress delaycompress notifempty create 0640 nginx adm sharedscripts postrotate [ -f /var/run/nginx.pid ] && kill -USR1 `cat /var/run/nginx.pid` endscript}日志分析变量:log_format analysis '$remote_addr - $remote_user [$time_local] ' '"$request" $status $body_bytes_sent ' '"$http_referer" "$http_user_agent" ' '$request_time $upstream_response_time ' '$upstream_addr $upstream_status ' '$scheme $server_name $request_uri ' '$http_host $http_x_forwarded_for ' '$request_id $connection $connections ' '$time_iso8601 $msec';常用日志变量:| 变量 | 说明 ||------|------|| $remoteaddr | 客户端 IP 地址 || $remoteuser | 认证用户名 || $timelocal | 本地时间 || $request | 完整的请求行 || $status | 响应状态码 || $bodybytessent | 发送的字节数 || $httpreferer | 来源页面 || $httpuseragent | 用户代理 || $requesttime | 请求处理时间 || $upstreamresponsetime | 上游响应时间 || $upstreamaddr | 上游服务器地址 || $upstreamstatus | 上游状态码 || $requestid | 请求 ID || $httpxforwarded_for | 真实客户端 IP |日志监控和告警:# 自定义错误日志格式log_format error_log_format '$time_local [$level] $message';# 记录慢请求log_format slow_request '$remote_addr - $remote_user [$time_local] ' '"$request" $status $request_time ' 'upstream_response_time=$upstream_response_time';http { # 慢请求阈值 map $request_time $slow_request { default 0; ~^([1-9]\d*\.?\d*|0\.\d*[1-9]\d*) 1; } server { listen 80; server_name example.com; # 记录慢请求 access_log /var/log/nginx/slow.log slow_request if=$slow_request; location / { proxy_pass http://backend; } }}日志安全:http { # 不记录敏感信息 log_format secure '$remote_addr - $remote_user [$time_local] ' '"$request" $status $body_bytes_sent ' '"$http_referer" "$http_user_agent" ' '$request_time'; server { listen 80; server_name example.com; # 不记录密码等敏感信息 location /login { access_log /var/log/nginx/login.log secure; proxy_pass http://auth_backend; } # 不记录敏感路径 location ~* ^/(admin|api/v1/users) { access_log /var/log/nginx/sensitive.log secure; proxy_pass http://backend; } }}完整日志配置示例:user nginx;worker_processes auto;http { # 日志格式 log_format main '$remote_addr - $remote_user [$time_local] ' '"$request" $status $body_bytes_sent ' '"$http_referer" "$http_user_agent" ' '$request_time $upstream_response_time'; log_format api '$remote_addr - $remote_user [$time_local] ' '"$request" $status $body_bytes_sent ' 'rt=$request_time uct="$upstream_connect_time" ' 'uht="$upstream_header_time" urt="$upstream_response_time" ' 'upstream_addr=$upstream_addr upstream_status=$upstream_status'; log_format json_combined escape=json '{' '"time_local":"$time_local",' '"remote_addr":"$remote_addr",' '"remote_user":"$remote_user",' '"request":"$request",' '"status":"$status",' '"body_bytes_sent":"$body_bytes_sent",' '"request_time":"$request_time",' '"http_referrer":"$http_referer",' '"http_user_agent":"$http_user_agent",' '"upstream_response_time":"$upstream_response_time"' '}'; # 全局日志 access_log /var/log/nginx/access.log main buffer=32k flush=5s; error_log /var/log/nginx/error.log warn; # 条件日志 map $status $loggable { ~^[23] 0; default 1; } # 慢请求 map $request_time $slow_request { default 0; ~^([1-9]\d*\.?\d*|0\.\d*[1-9]\d*) 1; } server { listen 80; server_name example.com; # 主日志 access_log /var/log/nginx/example.com.access.log main if=$loggable; # 慢请求日志 access_log /var/log/nginx/slow.log main if=$slow_request; # API 日志 location /api/ { access_log /var/log/nginx/api.log api; proxy_pass http://api_backend; } # 静态资源不记录 location ~* \.(css|js|jpg|jpeg|png|gif|ico|svg|woff|woff2)$ { access_log off; root /var/www/static; } # 健康检查不记录 location /health { access_log off; return 200 "OK"; } location / { proxy_pass http://backend; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } }}日志分析工具:GoAccess:实时日志分析AWStats:Web 日志分析ELK Stack:Elasticsearch + Logstash + KibanaGrafana + Loki:日志监控和可视化Splunk:企业级日志分析日志优化建议:合理设置日志级别:生产环境使用 warn 或 error使用缓冲:减少磁盘 I/O定期轮转:避免日志文件过大压缩旧日志:节省磁盘空间分离日志:按业务类型分离条件记录:只记录必要的信息使用 JSON 格式:便于机器解析监控日志大小:防止磁盘占满
阅读 0·2月21日 16:57

Nginx 如何配置反向代理?

Nginx 如何配置反向代理?Nginx 反向代理是指 Nginx 服务器接收客户端请求,然后将请求转发到后端服务器,再将后端服务器的响应返回给客户端。客户端只知道 Nginx 服务器的存在,不知道实际处理请求的后端服务器。基本配置示例:server { listen 80; server_name example.com; location / { proxy_pass http://backend_server; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; }}upstream backend_server { server 192.168.1.100:8080; server 192.168.1.101:8080;}关键配置指令说明:proxy_pass:指定后端服务器地址,可以是 IP 地址、域名或 upstream 组名proxysetheader:设置转发给后端服务器的请求头Host:保留原始请求的主机名X-Real-IP:记录客户端真实 IPX-Forwarded-For:记录请求经过的代理链X-Forwarded-Proto:记录原始协议(http/https)upstream:定义后端服务器组,实现负载均衡常用反向代理配置选项:location /api/ { proxy_pass http://backend_api; # 超时设置 proxy_connect_timeout 60s; proxy_send_timeout 60s; proxy_read_timeout 60s; # 缓冲设置 proxy_buffering on; proxy_buffer_size 4k; proxy_buffers 8 4k; # WebSocket 支持 proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; # 禁用重定向 proxy_redirect off;}负载均衡策略:轮询(默认):按顺序分配请求least_conn:分配给活动连接数最少的服务器ip_hash:根据客户端 IP 进行哈希分配,保证同一 IP 的请求到同一服务器least_time:分配给响应时间最短的服务器(需要商业版)实际应用场景:将请求转发到多个应用服务器隐藏后端服务器真实 IP统一入口,简化客户端配置实现 SSL 终止缓存静态内容WebSocket 代理
阅读 0·2月21日 16:57

Nginx 如何配置 WebSocket 代理?

Nginx 如何配置 WebSocket 代理?WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。Nginx 可以作为 WebSocket 代理,将客户端的 WebSocket 连接转发到后端服务器。基本配置:map $http_upgrade $connection_upgrade { default upgrade; '' close;}server { listen 80; server_name example.com; location /ws { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # 超时设置 proxy_connect_timeout 7d; proxy_send_timeout 7d; proxy_read_timeout 7d; }}关键配置说明:proxyhttpversion 1.1:WebSocket 需要 HTTP/1.1 协议Upgrade 和 Connection 头:告诉 Nginx 这是一个 WebSocket 连接超时设置:WebSocket 是长连接,需要设置较长的超时时间完整配置示例:http { upstream websocket_backend { server 192.168.1.100:8080; server 192.168.1.101:8080; } map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { listen 80; server_name example.com; # WebSocket 代理 location /ws { proxy_pass http://websocket_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # 超时设置(根据实际需求调整) proxy_connect_timeout 60s; proxy_send_timeout 3600s; proxy_read_timeout 3600s; # 禁用缓冲 proxy_buffering off; } # 普通请求 location / { proxy_pass http://backend; proxy_set_header Host $host; } }}HTTPS WebSocket 配置:server { listen 443 ssl; server_name example.com; ssl_certificate /etc/nginx/ssl/example.com.crt; ssl_certificate_key /etc/nginx/ssl/example.com.key; location /ws { proxy_pass http://websocket_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Host $host; proxy_connect_timeout 7d; proxy_send_timeout 7d; proxy_read_timeout 7d; }}WebSocket 负载均衡:upstream websocket_backend { # 使用 ip_hash 保持会话 ip_hash; server 192.168.1.100:8080; server 192.168.1.101:8080; server 192.168.1.102:8080;}注意事项:会话保持:WebSocket 连接需要保持到同一台后端服务器,使用 ip_hash 或 sticky 模块超时设置:根据业务需求设置合适的超时时间缓冲:WebSocket 实时通信需要禁用缓冲防火墙:确保防火墙允许长连接负载均衡:避免使用轮询策略,会导致连接中断性能优化:# 增加 worker 连接数events { worker_connections 4096;}# 调整 keepaliveupstream websocket_backend { server 192.168.1.100:8080; keepalive 32;}# 优化 TCP 参数location /ws { proxy_pass http://websocket_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; # TCP 优化 proxy_socket_keepalive on; proxy_connect_timeout 60s; proxy_send_timeout 3600s; proxy_read_timeout 3600s;}多路径 WebSocket:# 不同路径转发到不同后端location /chat/ws { proxy_pass http://chat_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade;}location /notification/ws { proxy_pass http://notification_backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade;}监控和日志:# 自定义日志格式log_format websocket '$remote_addr - $remote_user [$time_local] ' '"$request" $status $body_bytes_sent ' '"$http_referer" "$http_user_agent" ' 'rt=$request_time uct="$upstream_connect_time" ' 'uht="$upstream_header_time" urt="$upstream_response_time"';access_log /var/log/nginx/websocket_access.log websocket;# 监控连接数location /nginx_status { stub_status on; access_log off;}故障排查:连接断开:检查超时设置是否合理无法连接:检查 Upgrade 和 Connection 头是否正确负载均衡问题:使用 ip_hash 保持会话性能问题:调整 worker_connections 和缓冲设置
阅读 0·2月21日 16:57