乐闻世界logo
搜索文章和话题

How does Deno's task system work?

2月21日 16:08

Deno's Task System provides a way to run asynchronous tasks in the background, similar to Web Workers in browsers. This feature is very useful for executing CPU-intensive tasks or scenarios requiring parallel processing.

Task System Overview

Deno's task system allows you to create independent worker threads that can execute code in parallel without blocking the main thread. Each task has its own memory space and communicates with the main thread through message passing.

Basic Usage

1. Create Simple Task

typescript
// main.ts const worker = new Worker(new URL("./worker.ts", import.meta.url).href, { type: "module", }); worker.postMessage({ type: "start", data: 42 }); worker.onmessage = (event) => { console.log("Received from worker:", event.data); worker.terminate(); }; worker.onerror = (error) => { console.error("Worker error:", error); };
typescript
// worker.ts self.onmessage = (event) => { console.log("Worker received:", event.data); const result = event.data.data * 2; self.postMessage({ type: "result", data: result }); };

Run:

bash
deno run --allow-read main.ts

2. Wrap Worker with Promise

typescript
// main.ts function runWorker<T>(workerFile: string, data: any): Promise<T> { return new Promise((resolve, reject) => { const worker = new Worker(new URL(workerFile, import.meta.url).href, { type: "module", }); worker.postMessage(data); worker.onmessage = (event) => { resolve(event.data); worker.terminate(); }; worker.onerror = (error) => { reject(error); worker.terminate(); }; }); } async function main() { try { const result = await runWorker<number>("./worker.ts", { number: 10 }); console.log("Result:", result); } catch (error) { console.error("Error:", error); } } main();
typescript
// worker.ts self.onmessage = (event) => { const { number } = event.data; // Simulate time-consuming calculation let result = 0; for (let i = 0; i < number * 1000000; i++) { result += i; } self.postMessage(result); };

Practical Application Examples

1. Image Processing

typescript
// image-processor.ts self.onmessage = async (event) => { const { imageData, operation } = event.data; let result; switch (operation) { case "grayscale": result = applyGrayscale(imageData); break; case "invert": result = applyInvert(imageData); break; case "blur": result = applyBlur(imageData); break; default: throw new Error(`Unknown operation: ${operation}`); } self.postMessage({ result }); }; function applyGrayscale(data: Uint8ClampedArray): Uint8ClampedArray { const result = new Uint8ClampedArray(data.length); for (let i = 0; i < data.length; i += 4) { const r = data[i]; const g = data[i + 1]; const b = data[i + 2]; const gray = 0.299 * r + 0.587 * g + 0.114 * b; result[i] = gray; result[i + 1] = gray; result[i + 2] = gray; result[i + 3] = data[i + 3]; } return result; } function applyInvert(data: Uint8ClampedArray): Uint8ClampedArray { const result = new Uint8ClampedArray(data.length); for (let i = 0; i < data.length; i += 4) { result[i] = 255 - data[i]; result[i + 1] = 255 - data[i + 1]; result[i + 2] = 255 - data[i + 2]; result[i + 3] = data[i + 3]; } return result; } function applyBlur(data: Uint8ClampedArray): Uint8ClampedArray { // Simplified blur algorithm return data; // Actual implementation would be more complex }
typescript
// main.ts import { runWorker } from "./worker-utils.ts"; async function processImage(imagePath: string) { const imageData = await Deno.readFile(imagePath); const grayscaleResult = await runWorker<Uint8ClampedArray>( "./image-processor.ts", { imageData, operation: "grayscale" } ); await Deno.writeFile(`${imagePath}.grayscale.png`, grayscaleResult); const invertResult = await runWorker<Uint8ClampedArray>( "./image-processor.ts", { imageData, operation: "invert" } ); await Deno.writeFile(`${imagePath}.invert.png`, invertResult); console.log("Image processing complete"); } processImage("input.png");

2. Parallel Data Processing

typescript
// data-processor.ts self.onmessage = (event) => { const { data, chunkIndex, totalChunks } = event.data; console.log(`Processing chunk ${chunkIndex}/${totalChunks}`); // Simulate data processing const processed = data.map((item: number) => ({ value: item, processed: true, timestamp: Date.now(), })); self.postMessage({ chunkIndex, processed }); };
typescript
// main.ts import { runWorker } from "./worker-utils.ts"; async function processDataInParallel(data: number[], chunkSize: number = 1000) { const chunks: number[][] = []; for (let i = 0; i < data.length; i += chunkSize) { chunks.push(data.slice(i, i + chunkSize)); } console.log(`Processing ${chunks.length} chunks in parallel`); const promises = chunks.map((chunk, index) => runWorker("./data-processor.ts", { data: chunk, chunkIndex: index, totalChunks: chunks.length, }) ); const results = await Promise.all(promises); // Merge results const processedData = results .sort((a, b) => a.chunkIndex - b.chunkIndex) .flatMap((result) => result.processed); console.log(`Processed ${processedData.length} items`); return processedData; } // Generate test data const testData = Array.from({ length: 10000 }, (_, i) => i); processDataInParallel(testData, 1000);

3. Batch File Processing

typescript
// file-processor.ts self.onmessage = async (event) => { const { filePath, operation } = event.data; try { const content = await Deno.readTextFile(filePath); let result: string; switch (operation) { case "uppercase": result = content.toUpperCase(); break; case "lowercase": result = content.toLowerCase(); break; case "reverse": result = content.split("").reverse().join(""); break; case "count": result = String(content.length); break; default: throw new Error(`Unknown operation: ${operation}`); } self.postMessage({ filePath, result, success: true }); } catch (error) { self.postMessage({ filePath, error: error.message, success: false }); } };
typescript
// main.ts import { runWorker } from "./worker-utils.ts"; async function processFilesInParallel( files: string[], operation: string ) { console.log(`Processing ${files.length} files with operation: ${operation}`); const promises = files.map((file) => runWorker("./file-processor.ts", { filePath: file, operation }) ); const results = await Promise.all(promises); results.forEach((result) => { if (result.success) { console.log(`${result.filePath}: ${result.result.substring(0, 50)}...`); } else { console.error(`${result.filePath}: ${result.error}`); } }); return results; } // Get all .txt files in current directory const files = Array.from(Deno.readDirSync(".")) .filter((entry) => entry.isFile && entry.name.endsWith(".txt")) .map((entry) => entry.name); processFilesInParallel(files, "uppercase");

4. Password Hash Calculation

typescript
// password-hasher.ts self.onmessage = async (event) => { const { password, algorithm = "SHA-256" } = event.data; const encoder = new TextEncoder(); const data = encoder.encode(password); const hashBuffer = await crypto.subtle.digest(algorithm, data); const hashArray = Array.from(new Uint8Array(hashBuffer)); const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join(""); self.postMessage({ password, hash: hashHex, algorithm }); };
typescript
// main.ts import { runWorker } from "./worker-utils.ts"; async function hashPasswords(passwords: string[]) { console.log(`Hashing ${passwords.length} passwords`); const promises = passwords.map((password) => runWorker("./password-hasher.ts", { password }) ); const results = await Promise.all(promises); results.forEach((result) => { console.log(`${result.password}: ${result.hash}`); }); return results; } const passwords = ["password123", "admin", "user123", "test"]; hashPasswords(passwords);

Advanced Usage

1. Worker Pool

typescript
// worker-pool.ts export class WorkerPool { private workers: Worker[] = []; private taskQueue: Array<{ data: any; resolve: (value: any) => void; reject: (error: any) => void }> = []; private maxWorkers: number; constructor(workerFile: string, maxWorkers: number = 4) { this.maxWorkers = maxWorkers; for (let i = 0; i < maxWorkers; i++) { const worker = new Worker(new URL(workerFile, import.meta.url).href, { type: "module", }); worker.onmessage = (event) => { const task = this.taskQueue.shift(); if (task) { task.resolve(event.data); this.assignNextTask(worker); } }; worker.onerror = (error) => { const task = this.taskQueue.shift(); if (task) { task.reject(error); this.assignNextTask(worker); } }; this.workers.push(worker); } } private assignNextTask(worker: Worker) { const task = this.taskQueue[0]; if (task) { worker.postMessage(task.data); } } async execute(data: any): Promise<any> { return new Promise((resolve, reject) => { this.taskQueue.push({ data, resolve, reject }); // Find idle worker const idleWorker = this.workers.find((w) => !this.taskQueue.includes(w)); if (idleWorker) { this.assignNextTask(idleWorker); } }); } terminate() { this.workers.forEach((worker) => worker.terminate()); this.workers = []; } }

Using Worker Pool:

typescript
// main.ts import { WorkerPool } from "./worker-pool.ts"; const pool = new WorkerPool("./data-processor.ts", 4); async function processWithPool(data: number[]) { const promises = data.map((item) => pool.execute({ data: item })); const results = await Promise.all(promises); pool.terminate(); return results; } processWithPool([1, 2, 3, 4, 5, 6, 7, 8]);

2. Error Handling and Retry

typescript
// worker-with-retry.ts export async function runWorkerWithRetry<T>( workerFile: string, data: any, maxRetries: number = 3 ): Promise<T> { let lastError: Error | undefined; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await runWorker<T>(workerFile, data); } catch (error) { lastError = error as Error; console.error(`Attempt ${attempt} failed: ${error.message}`); if (attempt < maxRetries) { await new Promise((resolve) => setTimeout(resolve, 1000 * attempt)); } } } throw lastError; }

Best Practices

  1. Use Workers appropriately: Only use Workers for CPU-intensive tasks
  2. Control concurrency: Limit the number of concurrently running Workers
  3. Clean up properly: Terminate Workers after use
  4. Error handling: Handle Worker errors properly
  5. Message size: Avoid passing overly large messages
  6. Type safety: Use TypeScript to ensure correct message types

Deno's task system provides powerful support for parallel processing and background tasks, significantly improving application performance and responsiveness.

标签:Deno