Deno's Task System provides a way to run asynchronous tasks in the background, similar to Web Workers in browsers. This feature is very useful for executing CPU-intensive tasks or scenarios requiring parallel processing.
Task System Overview
Deno's task system allows you to create independent worker threads that can execute code in parallel without blocking the main thread. Each task has its own memory space and communicates with the main thread through message passing.
Basic Usage
1. Create Simple Task
typescript// main.ts const worker = new Worker(new URL("./worker.ts", import.meta.url).href, { type: "module", }); worker.postMessage({ type: "start", data: 42 }); worker.onmessage = (event) => { console.log("Received from worker:", event.data); worker.terminate(); }; worker.onerror = (error) => { console.error("Worker error:", error); };
typescript// worker.ts self.onmessage = (event) => { console.log("Worker received:", event.data); const result = event.data.data * 2; self.postMessage({ type: "result", data: result }); };
Run:
bashdeno run --allow-read main.ts
2. Wrap Worker with Promise
typescript// main.ts function runWorker<T>(workerFile: string, data: any): Promise<T> { return new Promise((resolve, reject) => { const worker = new Worker(new URL(workerFile, import.meta.url).href, { type: "module", }); worker.postMessage(data); worker.onmessage = (event) => { resolve(event.data); worker.terminate(); }; worker.onerror = (error) => { reject(error); worker.terminate(); }; }); } async function main() { try { const result = await runWorker<number>("./worker.ts", { number: 10 }); console.log("Result:", result); } catch (error) { console.error("Error:", error); } } main();
typescript// worker.ts self.onmessage = (event) => { const { number } = event.data; // Simulate time-consuming calculation let result = 0; for (let i = 0; i < number * 1000000; i++) { result += i; } self.postMessage(result); };
Practical Application Examples
1. Image Processing
typescript// image-processor.ts self.onmessage = async (event) => { const { imageData, operation } = event.data; let result; switch (operation) { case "grayscale": result = applyGrayscale(imageData); break; case "invert": result = applyInvert(imageData); break; case "blur": result = applyBlur(imageData); break; default: throw new Error(`Unknown operation: ${operation}`); } self.postMessage({ result }); }; function applyGrayscale(data: Uint8ClampedArray): Uint8ClampedArray { const result = new Uint8ClampedArray(data.length); for (let i = 0; i < data.length; i += 4) { const r = data[i]; const g = data[i + 1]; const b = data[i + 2]; const gray = 0.299 * r + 0.587 * g + 0.114 * b; result[i] = gray; result[i + 1] = gray; result[i + 2] = gray; result[i + 3] = data[i + 3]; } return result; } function applyInvert(data: Uint8ClampedArray): Uint8ClampedArray { const result = new Uint8ClampedArray(data.length); for (let i = 0; i < data.length; i += 4) { result[i] = 255 - data[i]; result[i + 1] = 255 - data[i + 1]; result[i + 2] = 255 - data[i + 2]; result[i + 3] = data[i + 3]; } return result; } function applyBlur(data: Uint8ClampedArray): Uint8ClampedArray { // Simplified blur algorithm return data; // Actual implementation would be more complex }
typescript// main.ts import { runWorker } from "./worker-utils.ts"; async function processImage(imagePath: string) { const imageData = await Deno.readFile(imagePath); const grayscaleResult = await runWorker<Uint8ClampedArray>( "./image-processor.ts", { imageData, operation: "grayscale" } ); await Deno.writeFile(`${imagePath}.grayscale.png`, grayscaleResult); const invertResult = await runWorker<Uint8ClampedArray>( "./image-processor.ts", { imageData, operation: "invert" } ); await Deno.writeFile(`${imagePath}.invert.png`, invertResult); console.log("Image processing complete"); } processImage("input.png");
2. Parallel Data Processing
typescript// data-processor.ts self.onmessage = (event) => { const { data, chunkIndex, totalChunks } = event.data; console.log(`Processing chunk ${chunkIndex}/${totalChunks}`); // Simulate data processing const processed = data.map((item: number) => ({ value: item, processed: true, timestamp: Date.now(), })); self.postMessage({ chunkIndex, processed }); };
typescript// main.ts import { runWorker } from "./worker-utils.ts"; async function processDataInParallel(data: number[], chunkSize: number = 1000) { const chunks: number[][] = []; for (let i = 0; i < data.length; i += chunkSize) { chunks.push(data.slice(i, i + chunkSize)); } console.log(`Processing ${chunks.length} chunks in parallel`); const promises = chunks.map((chunk, index) => runWorker("./data-processor.ts", { data: chunk, chunkIndex: index, totalChunks: chunks.length, }) ); const results = await Promise.all(promises); // Merge results const processedData = results .sort((a, b) => a.chunkIndex - b.chunkIndex) .flatMap((result) => result.processed); console.log(`Processed ${processedData.length} items`); return processedData; } // Generate test data const testData = Array.from({ length: 10000 }, (_, i) => i); processDataInParallel(testData, 1000);
3. Batch File Processing
typescript// file-processor.ts self.onmessage = async (event) => { const { filePath, operation } = event.data; try { const content = await Deno.readTextFile(filePath); let result: string; switch (operation) { case "uppercase": result = content.toUpperCase(); break; case "lowercase": result = content.toLowerCase(); break; case "reverse": result = content.split("").reverse().join(""); break; case "count": result = String(content.length); break; default: throw new Error(`Unknown operation: ${operation}`); } self.postMessage({ filePath, result, success: true }); } catch (error) { self.postMessage({ filePath, error: error.message, success: false }); } };
typescript// main.ts import { runWorker } from "./worker-utils.ts"; async function processFilesInParallel( files: string[], operation: string ) { console.log(`Processing ${files.length} files with operation: ${operation}`); const promises = files.map((file) => runWorker("./file-processor.ts", { filePath: file, operation }) ); const results = await Promise.all(promises); results.forEach((result) => { if (result.success) { console.log(`✓ ${result.filePath}: ${result.result.substring(0, 50)}...`); } else { console.error(`✗ ${result.filePath}: ${result.error}`); } }); return results; } // Get all .txt files in current directory const files = Array.from(Deno.readDirSync(".")) .filter((entry) => entry.isFile && entry.name.endsWith(".txt")) .map((entry) => entry.name); processFilesInParallel(files, "uppercase");
4. Password Hash Calculation
typescript// password-hasher.ts self.onmessage = async (event) => { const { password, algorithm = "SHA-256" } = event.data; const encoder = new TextEncoder(); const data = encoder.encode(password); const hashBuffer = await crypto.subtle.digest(algorithm, data); const hashArray = Array.from(new Uint8Array(hashBuffer)); const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join(""); self.postMessage({ password, hash: hashHex, algorithm }); };
typescript// main.ts import { runWorker } from "./worker-utils.ts"; async function hashPasswords(passwords: string[]) { console.log(`Hashing ${passwords.length} passwords`); const promises = passwords.map((password) => runWorker("./password-hasher.ts", { password }) ); const results = await Promise.all(promises); results.forEach((result) => { console.log(`${result.password}: ${result.hash}`); }); return results; } const passwords = ["password123", "admin", "user123", "test"]; hashPasswords(passwords);
Advanced Usage
1. Worker Pool
typescript// worker-pool.ts export class WorkerPool { private workers: Worker[] = []; private taskQueue: Array<{ data: any; resolve: (value: any) => void; reject: (error: any) => void }> = []; private maxWorkers: number; constructor(workerFile: string, maxWorkers: number = 4) { this.maxWorkers = maxWorkers; for (let i = 0; i < maxWorkers; i++) { const worker = new Worker(new URL(workerFile, import.meta.url).href, { type: "module", }); worker.onmessage = (event) => { const task = this.taskQueue.shift(); if (task) { task.resolve(event.data); this.assignNextTask(worker); } }; worker.onerror = (error) => { const task = this.taskQueue.shift(); if (task) { task.reject(error); this.assignNextTask(worker); } }; this.workers.push(worker); } } private assignNextTask(worker: Worker) { const task = this.taskQueue[0]; if (task) { worker.postMessage(task.data); } } async execute(data: any): Promise<any> { return new Promise((resolve, reject) => { this.taskQueue.push({ data, resolve, reject }); // Find idle worker const idleWorker = this.workers.find((w) => !this.taskQueue.includes(w)); if (idleWorker) { this.assignNextTask(idleWorker); } }); } terminate() { this.workers.forEach((worker) => worker.terminate()); this.workers = []; } }
Using Worker Pool:
typescript// main.ts import { WorkerPool } from "./worker-pool.ts"; const pool = new WorkerPool("./data-processor.ts", 4); async function processWithPool(data: number[]) { const promises = data.map((item) => pool.execute({ data: item })); const results = await Promise.all(promises); pool.terminate(); return results; } processWithPool([1, 2, 3, 4, 5, 6, 7, 8]);
2. Error Handling and Retry
typescript// worker-with-retry.ts export async function runWorkerWithRetry<T>( workerFile: string, data: any, maxRetries: number = 3 ): Promise<T> { let lastError: Error | undefined; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await runWorker<T>(workerFile, data); } catch (error) { lastError = error as Error; console.error(`Attempt ${attempt} failed: ${error.message}`); if (attempt < maxRetries) { await new Promise((resolve) => setTimeout(resolve, 1000 * attempt)); } } } throw lastError; }
Best Practices
- Use Workers appropriately: Only use Workers for CPU-intensive tasks
- Control concurrency: Limit the number of concurrently running Workers
- Clean up properly: Terminate Workers after use
- Error handling: Handle Worker errors properly
- Message size: Avoid passing overly large messages
- Type safety: Use TypeScript to ensure correct message types
Deno's task system provides powerful support for parallel processing and background tasks, significantly improving application performance and responsiveness.