调度器(Scheduler)的概念
调度器是 RxJS 中控制何时以及如何执行通知(next、error、complete)的机制。它决定了 Observable 的执行上下文和时序。
为什么需要调度器
- 时间控制: 控制任务的执行时间
- 并发控制: 管理异步操作的执行顺序
- 性能优化: 合理分配任务执行
- 测试便利: 在测试中控制时序
RxJS 内置调度器
1. null / undefined(同步调度器)
默认调度器,同步执行所有操作。
javascriptimport { of } from 'rxjs'; of(1, 2, 3).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // 输出: // Next: 1 // Next: 2 // Next: 3 // Complete // After subscription
2. asapScheduler(微任务调度器)
使用 Promise.then() 或 MutationObserver,在微任务队列中执行。
javascriptimport { of, asapScheduler } from 'rxjs'; of(1, 2, 3, asapScheduler).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // 输出: // After subscription // Next: 1 // Next: 2 // Next: 3 // Complete
使用场景:
- 需要在当前调用栈完成后执行
- 避免阻塞主线程
- 类似于 setTimeout(fn, 0) 但性能更好
3. asyncScheduler(宏任务调度器)
使用 setInterval,在宏任务队列中执行。
javascriptimport { of, asyncScheduler } from 'rxjs'; of(1, 2, 3, asyncScheduler).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // 输出: // After subscription // Next: 1 // Next: 2 // Next: 3 // Complete
使用场景:
- 需要延迟执行
- 定时任务
- 避免阻塞 UI 渲染
4. queueScheduler(队列调度器)
在当前事件帧中调度任务,保持顺序执行。
javascriptimport { of, queueScheduler } from 'rxjs'; of(1, 2, 3, queueScheduler).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // 输出: // Next: 1 // Next: 2 // Next: 3 // Complete // After subscription
使用场景:
- 需要保持执行顺序
- 递归操作
- 避免栈溢出
5. animationFrameScheduler(动画帧调度器)
基于 requestAnimationFrame,与浏览器渲染周期同步。
javascriptimport { interval, animationFrameScheduler } from 'rxjs'; import { take } from 'rxjs/operators'; interval(0, animationFrameScheduler).pipe( take(5) ).subscribe(value => { console.log('Frame:', value); }); // 输出: 与浏览器渲染帧同步的值
使用场景:
- 动画效果
- 平滑的 UI 更新
- 游戏开发
调度器的使用方式
1. 在 Observable 创建时指定
javascriptimport { of, asyncScheduler } from 'rxjs'; // 使用 asyncScheduler 延迟执行 const source$ = of(1, 2, 3, asyncScheduler); source$.subscribe(value => console.log(value));
2. 在操作符中使用
javascriptimport { of } from 'rxjs'; import { observeOn, subscribeOn } from 'rxjs/operators'; // observeOn: 控制下游的执行调度 of(1, 2, 3).pipe( observeOn(asyncScheduler) ).subscribe(value => console.log(value)); // subscribeOn: 控制订阅的执行调度 of(1, 2, 3).pipe( subscribeOn(asyncScheduler) ).subscribe(value => console.log(value));
3. 使用 schedule 方法
javascriptimport { asyncScheduler } from 'rxjs'; // 立即执行 asyncScheduler.schedule(() => { console.log('Immediate execution'); }); // 延迟执行 asyncScheduler.schedule(() => { console.log('Delayed execution'); }, 1000); // 周期性执行 let count = 0; asyncScheduler.schedule(function (state) { if (++count > 3) { return; } console.log('Periodic execution:', count); this.schedule(state, 1000); }, 1000);
实际应用场景
1. 延迟执行
javascriptimport { of, asyncScheduler } from 'rxjs'; // 延迟 1 秒后执行 of('Hello', asyncScheduler).pipe( delay(1000, asyncScheduler) ).subscribe(message => { console.log(message); });
2. 节流和防抖
javascriptimport { fromEvent } from 'rxjs'; import { throttleTime, debounceTime, asyncScheduler } from 'rxjs/operators'; // 节流:每 200ms 最多执行一次 fromEvent(window, 'scroll').pipe( throttleTime(200, asyncScheduler, { leading: true, trailing: true }) ).subscribe(event => { console.log('Throttled scroll event'); }); // 防抖:停止滚动 300ms 后执行 fromEvent(window, 'scroll').pipe( debounceTime(300, asyncScheduler) ).subscribe(event => { console.log('Debounced scroll event'); });
3. 动画效果
javascriptimport { interval, animationFrameScheduler } from 'rxjs'; import { map, takeWhile } from 'rxjs/operators'; // 平滑的动画效果 function animate(element: HTMLElement, duration: number) { const startTime = performance.now(); return interval(0, animationFrameScheduler).pipe( map(() => (performance.now() - startTime) / duration), takeWhile(progress => progress <= 1), map(progress => easeInOutQuad(progress)) ); } function easeInOutQuad(t: number): number { return t < 0.5 ? 2 * t * t : -1 + (4 - 2 * t) * t; } animate(element, 1000).subscribe(progress => { element.style.opacity = progress.toString(); });
4. 批量处理
javascriptimport { of, queueScheduler } from 'rxjs'; import { map } from 'rxjs/operators'; // 使用 queueScheduler 保持顺序 of(1, 2, 3, 4, 5, queueScheduler).pipe( map(x => { console.log('Processing:', x); return x * 2; }) ).subscribe(value => { console.log('Result:', value); });
5. 递归操作避免栈溢出
javascriptimport { queueScheduler } from 'rxjs'; function processLargeArray(array: number[]) { let index = 0; function processNext() { if (index >= array.length) { console.log('Processing complete'); return; } console.log('Processing item:', array[index]); index++; // 使用 queueScheduler 避免栈溢出 queueScheduler.schedule(processNext); } processNext(); } processLargeArray(new Array(100000).fill(0).map((_, i) => i));
调度器对比
| 调度器 | 执行时机 | 使用场景 | 性能 |
|---|---|---|---|
| null/undefined | 同步 | 默认执行 | 最高 |
| asapScheduler | 微任务 | 非阻塞执行 | 高 |
| asyncScheduler | 宏任务 | 延迟执行 | 中 |
| queueScheduler | 当前帧 | 保持顺序 | 高 |
| animationFrameScheduler | 动画帧 | 动画效果 | 中 |
最佳实践
1. 选择合适的调度器
javascriptimport { of, asyncScheduler, asapScheduler } from 'rxjs'; // 需要延迟执行 of(1, 2, 3, asyncScheduler).subscribe(); // 需要非阻塞执行 of(1, 2, 3, asapScheduler).subscribe();
2. 避免过度使用调度器
javascript// ❌ 不必要的调度器使用 of(1, 2, 3).pipe( observeOn(asyncScheduler), observeOn(asapScheduler) ).subscribe(); // ✅ 只在需要时使用 of(1, 2, 3).pipe( observeOn(asyncScheduler) ).subscribe();
3. 在测试中使用调度器
javascriptimport { TestScheduler } from 'rxjs/testing'; describe('My Observable', () => { let testScheduler: TestScheduler; beforeEach(() => { testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); }); it('should emit values with delay', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c|'); const expected = '--a-b-c|'; expectObservable(source$.pipe( delay(1, testScheduler) })).toBe(expected); }); }); });
4. 动画使用 animationFrameScheduler
javascriptimport { interval, animationFrameScheduler } from 'rxjs'; import { take } from 'rxjs/operators'; // ✅ 动画使用 animationFrameScheduler interval(0, animationFrameScheduler).pipe( take(60) // 60 帧动画 ).subscribe(frame => { updateAnimation(frame / 60); }); // ❌ 不要使用 asyncScheduler interval(16, asyncScheduler).pipe( take(60) ).subscribe(frame => { updateAnimation(frame / 60); });
常见问题
1. 调度器是否影响性能?
答案: 是的,调度器会引入一定的性能开销。同步调度器(null)性能最好,异步调度器会有额外的调度开销。
2. 如何选择调度器?
答案:
- 默认情况:不指定调度器
- 需要延迟:asyncScheduler
- 需要非阻塞:asapScheduler
- 需要保持顺序:queueScheduler
- 动画效果:animationFrameScheduler
3. observeOn 和 subscribeOn 的区别?
答案:
observeOn: 控制下游(订阅者)的执行调度subscribeOn: 控制上游(订阅)的执行调度
javascriptimport { of, asyncScheduler } from 'rxjs'; import { observeOn, subscribeOn } from 'rxjs/operators'; // observeOn: 下游在 asyncScheduler 中执行 of(1, 2, 3).pipe( observeOn(asyncScheduler) ).subscribe(value => { console.log('Value:', value); // 在 asyncScheduler 中执行 }); // subscribeOn: 订阅在 asyncScheduler 中执行 of(1, 2, 3).pipe( subscribeOn(asyncScheduler) ).subscribe(value => { console.log('Value:', value); // 在默认调度器中执行 });
总结
调度器是 RxJS 中强大的工具,它提供了:
- 时间控制: 精确控制任务的执行时间
- 并发管理: 合理管理异步操作的执行顺序
- 性能优化: 根据场景选择合适的调度器
- 测试支持: 在测试中控制时序
正确使用调度器可以显著提升应用的性能和用户体验。理解不同调度器的特性和使用场景,是成为 RxJS 高级开发者的关键。