乐闻世界logo
搜索文章和话题

Mongoose 如何处理事务和并发控制?

2月22日 20:08

Mongoose 支持 MongoDB 的事务功能,允许在多个文档或集合之间执行原子性操作。事务是确保数据一致性的重要机制。

事务基础

启用事务支持

MongoDB 4.0+ 支持副本集上的事务,MongoDB 4.2+ 支持分片集群上的事务。

javascript
const mongoose = require('mongoose'); // 连接到副本集 mongoose.connect('mongodb://localhost:27017/mydb?replicaSet=myReplicaSet');

创建会话

javascript
const session = await mongoose.startSession();

基本事务操作

简单事务

javascript
const session = await mongoose.startSession(); session.startTransaction(); try { // 执行操作 const user = await User.create([{ name: 'John' }], { session }); const account = await Account.create([{ userId: user[0]._id, balance: 100 }], { session }); // 提交事务 await session.commitTransaction(); console.log('Transaction committed'); } catch (error) { // 回滚事务 await session.abortTransaction(); console.log('Transaction aborted:', error); } finally { // 结束会话 session.endSession(); }

更新操作事务

javascript
const session = await mongoose.startSession(); session.startTransaction(); try { // 转账操作 const fromAccount = await Account.findById(fromAccountId).session(session); const toAccount = await Account.findById(toAccountId).session(session); if (fromAccount.balance < amount) { throw new Error('Insufficient balance'); } fromAccount.balance -= amount; toAccount.balance += amount; await fromAccount.save({ session }); await toAccount.save({ session }); await session.commitTransaction(); } catch (error) { await session.abortTransaction(); throw error; } finally { session.endSession(); }

事务选项

读写关注

javascript
const session = await mongoose.startSession({ defaultTransactionOptions: { readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' }, readPreference: 'primary' } }); session.startTransaction({ readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' } });

事务超时

javascript
session.startTransaction({ maxTimeMS: 5000 // 5秒超时 });

并发控制

乐观锁

使用版本号实现乐观锁:

javascript
const productSchema = new Schema({ name: String, stock: Number, version: { type: Number, default: 0 } }); productSchema.pre('save', function(next) { this.increment(); next(); }); // 更新时检查版本号 const product = await Product.findById(productId); product.stock -= quantity; try { await product.save(); } catch (error) { if (error.name === 'VersionError') { console.log('Document was modified by another process'); } }

悲观锁

使用 findOneAndUpdate 实现悲观锁:

javascript
const session = await mongoose.startSession(); session.startTransaction(); try { // 查找并锁定文档 const product = await Product.findOneAndUpdate( { _id: productId, locked: false }, { locked: true }, { session, new: true } ); if (!product) { throw new Error('Product is locked or not found'); } // 执行操作 product.stock -= quantity; await product.save({ session }); // 释放锁 product.locked = false; await product.save({ session }); await session.commitTransaction(); } catch (error) { await session.abortTransaction(); throw error; } finally { session.endSession(); }

原子操作

原子更新

javascript
// 原子递增 await User.findByIdAndUpdate(userId, { $inc: { balance: 100 } }); // 原子条件更新 await User.findOneAndUpdate( { _id: userId, balance: { $gte: 100 } }, { $inc: { balance: -100 } } ); // 原子数组操作 await User.findByIdAndUpdate(userId, { $push: { tags: 'new-tag' }, $addToSet: { uniqueTags: 'unique-tag' } });

批量原子操作

javascript
// 批量更新 await User.updateMany( { status: 'active' }, { $set: { lastActive: new Date() } } ); // 批量删除 await User.deleteMany({ status: 'deleted' });

事务最佳实践

  1. 保持事务简短:事务时间越长,冲突概率越高
  2. 避免长时间持有锁:尽快释放资源
  3. 使用适当的隔离级别:根据业务需求选择
  4. 处理重试逻辑:实现自动重试机制
  5. 监控事务性能:跟踪事务执行时间和失败率
  6. 合理设计数据模型:减少跨文档事务的需求
javascript
// 自动重试事务 async function runWithRetry(fn, maxRetries = 3) { for (let i = 0; i < maxRetries; i++) { try { return await fn(); } catch (error) { if (error.name === 'TransientTransactionError' && i < maxRetries - 1) { console.log(`Retrying transaction (attempt ${i + 1})`); await new Promise(resolve => setTimeout(resolve, 100 * (i + 1))); } else { throw error; } } } }

注意事项

  1. 事务只能在副本集或分片集群上使用
  2. 事务操作必须在同一个会话中执行
  3. 事务会带来性能开销,谨慎使用
  4. 避免在事务中执行耗时操作
  5. 确保正确处理事务错误和回滚
  6. 考虑使用原子操作替代简单事务
标签:Mongoose