乐闻世界logo
搜索文章和话题

How to implement Promise concurrency control?

2月22日 14:07

Promise concurrency control is an important performance optimization technique that allows us to limit the number of simultaneously executing asynchronous operations, avoiding resource exhaustion or performance degradation.

Why Concurrency Control is Needed

Problem Scenario

javascript
// Not recommended: initiate many requests simultaneously async function fetchAllUrls(urls) { const promises = urls.map(url => fetch(url)); const results = await Promise.all(promises); return results; } // Problems: // 1. May cause browser or server resource exhaustion // 2. Network bandwidth may be saturated // 3. May trigger server rate limiting // 4. High memory usage

Basic Concurrency Control Implementation

1. Using p-limit Library

javascript
import pLimit from 'p-limit'; async function fetchWithLimit(urls, concurrency = 5) { const limit = pLimit(concurrency); const promises = urls.map(url => limit(() => fetch(url)) ); const results = await Promise.all(promises); return results; }

2. Manual Concurrency Control Implementation

javascript
async function asyncPool(poolLimit, array, iteratorFn) { const ret = []; const executing = []; for (const item of array) { const p = Promise.resolve().then(() => iteratorFn(item)); ret.push(p); if (poolLimit <= array.length) { const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); if (executing.length >= poolLimit) { await Promise.race(executing); } } } return Promise.all(ret); } // Usage example async function fetchWithConcurrency(urls, concurrency = 5) { return asyncPool(concurrency, urls, url => fetch(url)); }

3. Queue Implementation

javascript
class ConcurrencyControl { constructor(concurrency) { this.concurrency = concurrency; this.queue = []; this.running = 0; } async run(task) { if (this.running >= this.concurrency) { await new Promise(resolve => this.queue.push(resolve)); } this.running++; try { return await task(); } finally { this.running--; const next = this.queue.shift(); if (next) next(); } } } // Usage example async function fetchWithControl(urls, concurrency = 5) { const control = new ConcurrencyControl(concurrency); const promises = urls.map(url => control.run(() => fetch(url)) ); return Promise.all(promises); }

Advanced Concurrency Control Implementation

1. Concurrency Control with Retry Mechanism

javascript
class ConcurrencyControlWithRetry { constructor(concurrency, maxRetries = 3) { this.concurrency = concurrency; this.maxRetries = maxRetries; this.queue = []; this.running = 0; } async run(task) { if (this.running >= this.concurrency) { await new Promise(resolve => this.queue.push(resolve)); } this.running++; let lastError; for (let i = 0; i < this.maxRetries; i++) { try { const result = await task(); return result; } catch (error) { lastError = error; if (i < this.maxRetries - 1) { await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)) ); } } } throw lastError; } } // Usage example async function fetchWithRetry(urls, concurrency = 5) { const control = new ConcurrencyControlWithRetry(concurrency, 3); const promises = urls.map(url => control.run(() => fetch(url)) ); return Promise.all(promises); }

2. Concurrency Control with Timeout

javascript
class ConcurrencyControlWithTimeout { constructor(concurrency, timeout = 5000) { this.concurrency = concurrency; this.timeout = timeout; this.queue = []; this.running = 0; } async run(task) { if (this.running >= this.concurrency) { await new Promise(resolve => this.queue.push(resolve)); } this.running++; try { const result = await Promise.race([ task(), new Promise((_, reject) => setTimeout(() => reject(new Error('Timeout')), this.timeout) ) ]); return result; } finally { this.running--; const next = this.queue.shift(); if (next) next(); } } } // Usage example async function fetchWithTimeout(urls, concurrency = 5) { const control = new ConcurrencyControlWithTimeout(concurrency, 5000); const promises = urls.map(url => control.run(() => fetch(url)) ); return Promise.all(promises); }

3. Concurrency Control with Priority

javascript
class PriorityConcurrencyControl { constructor(concurrency) { this.concurrency = concurrency; this.queue = []; this.running = 0; } async run(task, priority = 0) { const taskWrapper = { task, priority, resolve: null }; this.queue.push(taskWrapper); this.queue.sort((a, b) => b.priority - a.priority); if (this.running >= this.concurrency) { await new Promise(resolve => { taskWrapper.resolve = resolve; }); } this.running++; try { return await task(); } finally { this.running--; const next = this.queue.find(t => t.resolve); if (next) { this.queue.splice(this.queue.indexOf(next), 1); next.resolve(); } } } } // Usage example async function fetchWithPriority(urls, concurrency = 5) { const control = new PriorityConcurrencyControl(concurrency); const promises = urls.map((url, index) => control.run(() => fetch(url), index % 3) ); return Promise.all(promises); }

Practical Use Cases

1. Batch File Downloads

javascript
async function downloadFiles(urls, concurrency = 3) { const control = new ConcurrencyControl(concurrency); const results = await Promise.all( urls.map(url => control.run(async () => { const response = await fetch(url); const blob = await response.blob(); return { url, blob }; }) ) ); return results; }

2. Batch Database Query Processing

javascript
async function processQueries(queries, concurrency = 5) { const control = new ConcurrencyControl(concurrency); const results = await Promise.all( queries.map(query => control.run(() => database.execute(query)) ) ); return results; }

3. Batch Email Sending

javascript
async function sendEmails(recipients, concurrency = 10) { const control = new ConcurrencyControl(concurrency); const results = await Promise.allSettled( recipients.map(recipient => control.run(() => emailService.send(recipient)) ) ); const successful = results.filter(r => r.status === 'fulfilled').length; const failed = results.filter(r => r.status === 'rejected').length; console.log(`Sending completed: ${successful} successful, ${failed} failed`); return results; }

Performance Optimization Techniques

1. Dynamic Concurrency Adjustment

javascript
class AdaptiveConcurrencyControl { constructor(initialConcurrency = 5, maxConcurrency = 20) { this.concurrency = initialConcurrency; this.maxConcurrency = maxConcurrency; this.minConcurrency = 1; this.queue = []; this.running = 0; this.successCount = 0; this.errorCount = 0; } async run(task) { if (this.running >= this.concurrency) { await new Promise(resolve => this.queue.push(resolve)); } this.running++; try { const result = await task(); this.successCount++; this.adjustConcurrency(); return result; } catch (error) { this.errorCount++; this.adjustConcurrency(); throw error; } finally { this.running--; const next = this.queue.shift(); if (next) next(); } } adjustConcurrency() { const total = this.successCount + this.errorCount; const errorRate = this.errorCount / total; if (errorRate < 0.1 && this.concurrency < this.maxConcurrency) { this.concurrency = Math.min(this.concurrency + 1, this.maxConcurrency); } else if (errorRate > 0.3 && this.concurrency > this.minConcurrency) { this.concurrency = Math.max(this.concurrency - 1, this.minConcurrency); } } }

2. Progress Monitoring

javascript
class ConcurrencyControlWithProgress { constructor(concurrency, onProgress) { this.concurrency = concurrency; this.onProgress = onProgress; this.queue = []; this.running = 0; this.completed = 0; this.total = 0; } async run(task) { this.total++; if (this.running >= this.concurrency) { await new Promise(resolve => this.queue.push(resolve)); } this.running++; try { const result = await task(); return result; } finally { this.running++; this.completed++; this.onProgress(this.completed, this.total); this.running--; const next = this.queue.shift(); if (next) next(); } } } // Usage example async function fetchWithProgress(urls, concurrency = 5) { const control = new ConcurrencyControlWithProgress(concurrency, (completed, total) => { console.log(`Progress: ${completed}/${total} (${(completed/total*100).toFixed(1)}%)`); }); const promises = urls.map(url => control.run(() => fetch(url)) ); return Promise.all(promises); }

Common Questions

1. How to Choose the Right Concurrency Number?

javascript
// Choose based on network type function getOptimalConcurrency() { const connection = navigator.connection || navigator.mozConnection || navigator.webkitConnection; if (connection) { return Math.min(connection.downlink || 4, 10); } return 4; // Default value }

2. How to Handle Failed Tasks?

javascript
async function fetchWithPartialFailure(urls, concurrency = 5) { const control = new ConcurrencyControl(concurrency); const results = await Promise.allSettled( urls.map(url => control.run(() => fetch(url)) ) ); const successful = results.filter(r => r.status === 'fulfilled'); const failed = results.filter(r => r.status === 'rejected'); console.log(`Successful: ${successful.length}, Failed: ${failed.length}`); return { successful, failed }; }

Summary

  1. Avoid resource exhaustion: Limit the number of simultaneously executing asynchronous operations
  2. Improve performance: Proper concurrency control can improve overall performance
  3. Flexible implementation: Implement different concurrency control strategies based on needs
  4. Error handling: Combine with retry, timeout and other mechanisms to improve reliability
  5. Progress monitoring: Monitor task execution progress in real-time
  6. Dynamic adjustment: Dynamically adjust concurrency based on actual conditions
标签:Promise