Mongoose 支持 MongoDB 的事务功能,允许在多个文档或集合之间执行原子性操作。事务是确保数据一致性的重要机制。
事务基础
启用事务支持
MongoDB 4.0+ 支持副本集上的事务,MongoDB 4.2+ 支持分片集群上的事务。
javascriptconst mongoose = require('mongoose'); // 连接到副本集 mongoose.connect('mongodb://localhost:27017/mydb?replicaSet=myReplicaSet');
创建会话
javascriptconst session = await mongoose.startSession();
基本事务操作
简单事务
javascriptconst session = await mongoose.startSession(); session.startTransaction(); try { // 执行操作 const user = await User.create([{ name: 'John' }], { session }); const account = await Account.create([{ userId: user[0]._id, balance: 100 }], { session }); // 提交事务 await session.commitTransaction(); console.log('Transaction committed'); } catch (error) { // 回滚事务 await session.abortTransaction(); console.log('Transaction aborted:', error); } finally { // 结束会话 session.endSession(); }
更新操作事务
javascriptconst session = await mongoose.startSession(); session.startTransaction(); try { // 转账操作 const fromAccount = await Account.findById(fromAccountId).session(session); const toAccount = await Account.findById(toAccountId).session(session); if (fromAccount.balance < amount) { throw new Error('Insufficient balance'); } fromAccount.balance -= amount; toAccount.balance += amount; await fromAccount.save({ session }); await toAccount.save({ session }); await session.commitTransaction(); } catch (error) { await session.abortTransaction(); throw error; } finally { session.endSession(); }
事务选项
读写关注
javascriptconst session = await mongoose.startSession({ defaultTransactionOptions: { readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' }, readPreference: 'primary' } }); session.startTransaction({ readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' } });
事务超时
javascriptsession.startTransaction({ maxTimeMS: 5000 // 5秒超时 });
并发控制
乐观锁
使用版本号实现乐观锁:
javascriptconst productSchema = new Schema({ name: String, stock: Number, version: { type: Number, default: 0 } }); productSchema.pre('save', function(next) { this.increment(); next(); }); // 更新时检查版本号 const product = await Product.findById(productId); product.stock -= quantity; try { await product.save(); } catch (error) { if (error.name === 'VersionError') { console.log('Document was modified by another process'); } }
悲观锁
使用 findOneAndUpdate 实现悲观锁:
javascriptconst session = await mongoose.startSession(); session.startTransaction(); try { // 查找并锁定文档 const product = await Product.findOneAndUpdate( { _id: productId, locked: false }, { locked: true }, { session, new: true } ); if (!product) { throw new Error('Product is locked or not found'); } // 执行操作 product.stock -= quantity; await product.save({ session }); // 释放锁 product.locked = false; await product.save({ session }); await session.commitTransaction(); } catch (error) { await session.abortTransaction(); throw error; } finally { session.endSession(); }
原子操作
原子更新
javascript// 原子递增 await User.findByIdAndUpdate(userId, { $inc: { balance: 100 } }); // 原子条件更新 await User.findOneAndUpdate( { _id: userId, balance: { $gte: 100 } }, { $inc: { balance: -100 } } ); // 原子数组操作 await User.findByIdAndUpdate(userId, { $push: { tags: 'new-tag' }, $addToSet: { uniqueTags: 'unique-tag' } });
批量原子操作
javascript// 批量更新 await User.updateMany( { status: 'active' }, { $set: { lastActive: new Date() } } ); // 批量删除 await User.deleteMany({ status: 'deleted' });
事务最佳实践
- 保持事务简短:事务时间越长,冲突概率越高
- 避免长时间持有锁:尽快释放资源
- 使用适当的隔离级别:根据业务需求选择
- 处理重试逻辑:实现自动重试机制
- 监控事务性能:跟踪事务执行时间和失败率
- 合理设计数据模型:减少跨文档事务的需求
javascript// 自动重试事务 async function runWithRetry(fn, maxRetries = 3) { for (let i = 0; i < maxRetries; i++) { try { return await fn(); } catch (error) { if (error.name === 'TransientTransactionError' && i < maxRetries - 1) { console.log(`Retrying transaction (attempt ${i + 1})`); await new Promise(resolve => setTimeout(resolve, 100 * (i + 1))); } else { throw error; } } } }
注意事项
- 事务只能在副本集或分片集群上使用
- 事务操作必须在同一个会话中执行
- 事务会带来性能开销,谨慎使用
- 避免在事务中执行耗时操作
- 确保正确处理事务错误和回滚
- 考虑使用原子操作替代简单事务