乐闻世界logo
搜索文章和话题

RxJS 中的调度器(Scheduler)是什么?如何使用?

2月21日 16:23

调度器(Scheduler)的概念

调度器是 RxJS 中控制何时以及如何执行通知(next、error、complete)的机制。它决定了 Observable 的执行上下文和时序。

为什么需要调度器

  1. 时间控制: 控制任务的执行时间
  2. 并发控制: 管理异步操作的执行顺序
  3. 性能优化: 合理分配任务执行
  4. 测试便利: 在测试中控制时序

RxJS 内置调度器

1. null / undefined(同步调度器)

默认调度器,同步执行所有操作。

javascript
import { of } from 'rxjs'; of(1, 2, 3).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // 输出: // Next: 1 // Next: 2 // Next: 3 // Complete // After subscription

2. asapScheduler(微任务调度器)

使用 Promise.then() 或 MutationObserver,在微任务队列中执行。

javascript
import { of, asapScheduler } from 'rxjs'; of(1, 2, 3, asapScheduler).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // 输出: // After subscription // Next: 1 // Next: 2 // Next: 3 // Complete

使用场景:

  • 需要在当前调用栈完成后执行
  • 避免阻塞主线程
  • 类似于 setTimeout(fn, 0) 但性能更好

3. asyncScheduler(宏任务调度器)

使用 setInterval,在宏任务队列中执行。

javascript
import { of, asyncScheduler } from 'rxjs'; of(1, 2, 3, asyncScheduler).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // 输出: // After subscription // Next: 1 // Next: 2 // Next: 3 // Complete

使用场景:

  • 需要延迟执行
  • 定时任务
  • 避免阻塞 UI 渲染

4. queueScheduler(队列调度器)

在当前事件帧中调度任务,保持顺序执行。

javascript
import { of, queueScheduler } from 'rxjs'; of(1, 2, 3, queueScheduler).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // 输出: // Next: 1 // Next: 2 // Next: 3 // Complete // After subscription

使用场景:

  • 需要保持执行顺序
  • 递归操作
  • 避免栈溢出

5. animationFrameScheduler(动画帧调度器)

基于 requestAnimationFrame,与浏览器渲染周期同步。

javascript
import { interval, animationFrameScheduler } from 'rxjs'; import { take } from 'rxjs/operators'; interval(0, animationFrameScheduler).pipe( take(5) ).subscribe(value => { console.log('Frame:', value); }); // 输出: 与浏览器渲染帧同步的值

使用场景:

  • 动画效果
  • 平滑的 UI 更新
  • 游戏开发

调度器的使用方式

1. 在 Observable 创建时指定

javascript
import { of, asyncScheduler } from 'rxjs'; // 使用 asyncScheduler 延迟执行 const source$ = of(1, 2, 3, asyncScheduler); source$.subscribe(value => console.log(value));

2. 在操作符中使用

javascript
import { of } from 'rxjs'; import { observeOn, subscribeOn } from 'rxjs/operators'; // observeOn: 控制下游的执行调度 of(1, 2, 3).pipe( observeOn(asyncScheduler) ).subscribe(value => console.log(value)); // subscribeOn: 控制订阅的执行调度 of(1, 2, 3).pipe( subscribeOn(asyncScheduler) ).subscribe(value => console.log(value));

3. 使用 schedule 方法

javascript
import { asyncScheduler } from 'rxjs'; // 立即执行 asyncScheduler.schedule(() => { console.log('Immediate execution'); }); // 延迟执行 asyncScheduler.schedule(() => { console.log('Delayed execution'); }, 1000); // 周期性执行 let count = 0; asyncScheduler.schedule(function (state) { if (++count > 3) { return; } console.log('Periodic execution:', count); this.schedule(state, 1000); }, 1000);

实际应用场景

1. 延迟执行

javascript
import { of, asyncScheduler } from 'rxjs'; // 延迟 1 秒后执行 of('Hello', asyncScheduler).pipe( delay(1000, asyncScheduler) ).subscribe(message => { console.log(message); });

2. 节流和防抖

javascript
import { fromEvent } from 'rxjs'; import { throttleTime, debounceTime, asyncScheduler } from 'rxjs/operators'; // 节流:每 200ms 最多执行一次 fromEvent(window, 'scroll').pipe( throttleTime(200, asyncScheduler, { leading: true, trailing: true }) ).subscribe(event => { console.log('Throttled scroll event'); }); // 防抖:停止滚动 300ms 后执行 fromEvent(window, 'scroll').pipe( debounceTime(300, asyncScheduler) ).subscribe(event => { console.log('Debounced scroll event'); });

3. 动画效果

javascript
import { interval, animationFrameScheduler } from 'rxjs'; import { map, takeWhile } from 'rxjs/operators'; // 平滑的动画效果 function animate(element: HTMLElement, duration: number) { const startTime = performance.now(); return interval(0, animationFrameScheduler).pipe( map(() => (performance.now() - startTime) / duration), takeWhile(progress => progress <= 1), map(progress => easeInOutQuad(progress)) ); } function easeInOutQuad(t: number): number { return t < 0.5 ? 2 * t * t : -1 + (4 - 2 * t) * t; } animate(element, 1000).subscribe(progress => { element.style.opacity = progress.toString(); });

4. 批量处理

javascript
import { of, queueScheduler } from 'rxjs'; import { map } from 'rxjs/operators'; // 使用 queueScheduler 保持顺序 of(1, 2, 3, 4, 5, queueScheduler).pipe( map(x => { console.log('Processing:', x); return x * 2; }) ).subscribe(value => { console.log('Result:', value); });

5. 递归操作避免栈溢出

javascript
import { queueScheduler } from 'rxjs'; function processLargeArray(array: number[]) { let index = 0; function processNext() { if (index >= array.length) { console.log('Processing complete'); return; } console.log('Processing item:', array[index]); index++; // 使用 queueScheduler 避免栈溢出 queueScheduler.schedule(processNext); } processNext(); } processLargeArray(new Array(100000).fill(0).map((_, i) => i));

调度器对比

调度器执行时机使用场景性能
null/undefined同步默认执行最高
asapScheduler微任务非阻塞执行
asyncScheduler宏任务延迟执行
queueScheduler当前帧保持顺序
animationFrameScheduler动画帧动画效果

最佳实践

1. 选择合适的调度器

javascript
import { of, asyncScheduler, asapScheduler } from 'rxjs'; // 需要延迟执行 of(1, 2, 3, asyncScheduler).subscribe(); // 需要非阻塞执行 of(1, 2, 3, asapScheduler).subscribe();

2. 避免过度使用调度器

javascript
// ❌ 不必要的调度器使用 of(1, 2, 3).pipe( observeOn(asyncScheduler), observeOn(asapScheduler) ).subscribe(); // ✅ 只在需要时使用 of(1, 2, 3).pipe( observeOn(asyncScheduler) ).subscribe();

3. 在测试中使用调度器

javascript
import { TestScheduler } from 'rxjs/testing'; describe('My Observable', () => { let testScheduler: TestScheduler; beforeEach(() => { testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); }); it('should emit values with delay', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c|'); const expected = '--a-b-c|'; expectObservable(source$.pipe( delay(1, testScheduler) })).toBe(expected); }); }); });

4. 动画使用 animationFrameScheduler

javascript
import { interval, animationFrameScheduler } from 'rxjs'; import { take } from 'rxjs/operators'; // ✅ 动画使用 animationFrameScheduler interval(0, animationFrameScheduler).pipe( take(60) // 60 帧动画 ).subscribe(frame => { updateAnimation(frame / 60); }); // ❌ 不要使用 asyncScheduler interval(16, asyncScheduler).pipe( take(60) ).subscribe(frame => { updateAnimation(frame / 60); });

常见问题

1. 调度器是否影响性能?

答案: 是的,调度器会引入一定的性能开销。同步调度器(null)性能最好,异步调度器会有额外的调度开销。

2. 如何选择调度器?

答案:

  • 默认情况:不指定调度器
  • 需要延迟:asyncScheduler
  • 需要非阻塞:asapScheduler
  • 需要保持顺序:queueScheduler
  • 动画效果:animationFrameScheduler

3. observeOn 和 subscribeOn 的区别?

答案:

  • observeOn: 控制下游(订阅者)的执行调度
  • subscribeOn: 控制上游(订阅)的执行调度
javascript
import { of, asyncScheduler } from 'rxjs'; import { observeOn, subscribeOn } from 'rxjs/operators'; // observeOn: 下游在 asyncScheduler 中执行 of(1, 2, 3).pipe( observeOn(asyncScheduler) ).subscribe(value => { console.log('Value:', value); // 在 asyncScheduler 中执行 }); // subscribeOn: 订阅在 asyncScheduler 中执行 of(1, 2, 3).pipe( subscribeOn(asyncScheduler) ).subscribe(value => { console.log('Value:', value); // 在默认调度器中执行 });

总结

调度器是 RxJS 中强大的工具,它提供了:

  1. 时间控制: 精确控制任务的执行时间
  2. 并发管理: 合理管理异步操作的执行顺序
  3. 性能优化: 根据场景选择合适的调度器
  4. 测试支持: 在测试中控制时序

正确使用调度器可以显著提升应用的性能和用户体验。理解不同调度器的特性和使用场景,是成为 RxJS 高级开发者的关键。

标签:Rxjs