乐闻世界logo
搜索文章和话题

What is Scheduler in RxJS and how to use it?

2月21日 16:23

Concept of Scheduler

Scheduler is a mechanism in RxJS that controls when and how notifications (next, error, complete) are executed. It determines the execution context and timing of Observables.

Why We Need Schedulers

  1. Time Control: Control when tasks are executed
  2. Concurrency Control: Manage execution order of asynchronous operations
  3. Performance Optimization: Reasonably allocate task execution
  4. Testing Convenience: Control timing in tests

Built-in RxJS Schedulers

1. null / undefined (Synchronous Scheduler)

Default scheduler, executes all operations synchronously.

javascript
import { of } from 'rxjs'; of(1, 2, 3).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // Output: // Next: 1 // Next: 2 // Next: 3 // Complete // After subscription

2. asapScheduler (Microtask Scheduler)

Uses Promise.then() or MutationObserver, executes in the microtask queue.

javascript
import { of, asapScheduler } from 'rxjs'; of(1, 2, 3, asapScheduler).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // Output: // After subscription // Next: 1 // Next: 2 // Next: 3 // Complete

Use cases:

  • Need to execute after current call stack completes
  • Avoid blocking main thread
  • Similar to setTimeout(fn, 0) but better performance

3. asyncScheduler (Macrotask Scheduler)

Uses setInterval, executes in the macrotask queue.

javascript
import { of, asyncScheduler } from 'rxjs'; of(1, 2, 3, asyncScheduler).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // Output: // After subscription // Next: 1 // Next: 2 // Next: 3 // Complete

Use cases:

  • Need delayed execution
  • Scheduled tasks
  • Avoid blocking UI rendering

4. queueScheduler (Queue Scheduler)

Schedules tasks in the current event frame, maintains sequential execution.

javascript
import { of, queueScheduler } from 'rxjs'; of(1, 2, 3, queueScheduler).subscribe({ next: value => console.log('Next:', value), complete: () => console.log('Complete') }); console.log('After subscription'); // Output: // Next: 1 // Next: 2 // Next: 3 // Complete // After subscription

Use cases:

  • Need to maintain execution order
  • Recursive operations
  • Avoid stack overflow

5. animationFrameScheduler (Animation Frame Scheduler)

Based on requestAnimationFrame, synchronized with browser rendering cycle.

javascript
import { interval, animationFrameScheduler } from 'rxjs'; import { take } from 'rxjs/operators'; interval(0, animationFrameScheduler).pipe( take(5) ).subscribe(value => { console.log('Frame:', value); }); // Output: Values synchronized with browser rendering frames

Use cases:

  • Animation effects
  • Smooth UI updates
  • Game development

Ways to Use Schedulers

1. Specify when creating Observable

javascript
import { of, asyncScheduler } from 'rxjs'; // Use asyncScheduler for delayed execution const source$ = of(1, 2, 3, asyncScheduler); source$.subscribe(value => console.log(value));

2. Use in operators

javascript
import { of } from 'rxjs'; import { observeOn, subscribeOn } from 'rxjs/operators'; // observeOn: Control downstream execution scheduling of(1, 2, 3).pipe( observeOn(asyncScheduler) ).subscribe(value => console.log(value)); // subscribeOn: Control subscription execution scheduling of(1, 2, 3).pipe( subscribeOn(asyncScheduler) ).subscribe(value => console.log(value));

3. Using schedule method

javascript
import { asyncScheduler } from 'rxjs'; // Immediate execution asyncScheduler.schedule(() => { console.log('Immediate execution'); }); // Delayed execution asyncScheduler.schedule(() => { console.log('Delayed execution'); }, 1000); // Periodic execution let count = 0; asyncScheduler.schedule(function (state) { if (++count > 3) { return; } console.log('Periodic execution:', count); this.schedule(state, 1000); }, 1000);

Practical Use Cases

1. Delayed Execution

javascript
import { of, asyncScheduler } from 'rxjs'; // Execute after 1 second delay of('Hello', asyncScheduler).pipe( delay(1000, asyncScheduler) ).subscribe(message => { console.log(message); });

2. Throttling and Debouncing

javascript
import { fromEvent } from 'rxjs'; import { throttleTime, debounceTime, asyncScheduler } from 'rxjs/operators'; // Throttle: Execute at most once every 200ms fromEvent(window, 'scroll').pipe( throttleTime(200, asyncScheduler, { leading: true, trailing: true }) ).subscribe(event => { console.log('Throttled scroll event'); }); // Debounce: Execute 300ms after scrolling stops fromEvent(window, 'scroll').pipe( debounceTime(300, asyncScheduler) ).subscribe(event => { console.log('Debounced scroll event'); });

3. Animation Effects

javascript
import { interval, animationFrameScheduler } from 'rxjs'; import { map, takeWhile } from 'rxjs/operators'; // Smooth animation effect function animate(element: HTMLElement, duration: number) { const startTime = performance.now(); return interval(0, animationFrameScheduler).pipe( map(() => (performance.now() - startTime) / duration), takeWhile(progress => progress <= 1), map(progress => easeInOutQuad(progress)) ); } function easeInOutQuad(t: number): number { return t < 0.5 ? 2 * t * t : -1 + (4 - 2 * t) * t; } animate(element, 1000).subscribe(progress => { element.style.opacity = progress.toString(); });

4. Batch Processing

javascript
import { of, queueScheduler } from 'rxjs'; import { map } from 'rxjs/operators'; // Use queueScheduler to maintain order of(1, 2, 3, 4, 5, queueScheduler).pipe( map(x => { console.log('Processing:', x); return x * 2; }) ).subscribe(value => { console.log('Result:', value); });

5. Recursive Operations Avoid Stack Overflow

javascript
import { queueScheduler } from 'rxjs'; function processLargeArray(array: number[]) { let index = 0; function processNext() { if (index >= array.length) { console.log('Processing complete'); return; } console.log('Processing item:', array[index]); index++; // Use queueScheduler to avoid stack overflow queueScheduler.schedule(processNext); } processNext(); } processLargeArray(new Array(100000).fill(0).map((_, i) => i));

Scheduler Comparison

SchedulerExecution TimingUse CasesPerformance
null/undefinedSynchronousDefault executionHighest
asapSchedulerMicrotaskNon-blocking executionHigh
asyncSchedulerMacrotaskDelayed executionMedium
queueSchedulerCurrent frameMaintain orderHigh
animationFrameSchedulerAnimation frameAnimation effectsMedium

Best Practices

1. Choose the Right Scheduler

javascript
import { of, asyncScheduler, asapScheduler } from 'rxjs'; // Need delayed execution of(1, 2, 3, asyncScheduler).subscribe(); // Need non-blocking execution of(1, 2, 3, asapScheduler).subscribe();

2. Avoid Overusing Schedulers

javascript
// ❌ Unnecessary scheduler usage of(1, 2, 3).pipe( observeOn(asyncScheduler), observeOn(asapScheduler) ).subscribe(); // ✅ Only use when needed of(1, 2, 3).pipe( observeOn(asyncScheduler) ).subscribe();

3. Use Schedulers in Testing

javascript
import { TestScheduler } from 'rxjs/testing'; describe('My Observable', () => { let testScheduler: TestScheduler; beforeEach(() => { testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); }); it('should emit values with delay', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c|'); const expected = '--a-b-c|'; expectObservable(source$.pipe( delay(1, testScheduler) })).toBe(expected); }); }); });

4. Use animationFrameScheduler for Animations

javascript
import { interval, animationFrameScheduler } from 'rxjs'; import { take } from 'rxjs/operators'; // ✅ Use animationFrameScheduler for animations interval(0, animationFrameScheduler).pipe( take(60) // 60 frame animation ).subscribe(frame => { updateAnimation(frame / 60); }); // ❌ Don't use asyncScheduler interval(16, asyncScheduler).pipe( take(60) ).subscribe(frame => { updateAnimation(frame / 60); });

Common Questions

1. Do schedulers affect performance?

Answer: Yes, schedulers introduce some performance overhead. Synchronous scheduler (null) has the best performance, while async schedulers have additional scheduling overhead.

2. How to choose a scheduler?

Answer:

  • Default: Don't specify a scheduler
  • Need delay: asyncScheduler
  • Need non-blocking: asapScheduler
  • Need to maintain order: queueScheduler
  • Animation effects: animationFrameScheduler

3. Difference between observeOn and subscribeOn?

Answer:

  • observeOn: Controls downstream (subscriber) execution scheduling
  • subscribeOn: Controls upstream (subscription) execution scheduling
javascript
import { of, asyncScheduler } from 'rxjs'; import { observeOn, subscribeOn } from 'rxjs/operators'; // observeOn: Downstream executes in asyncScheduler of(1, 2, 3).pipe( observeOn(asyncScheduler) ).subscribe(value => { console.log('Value:', value); // Executes in asyncScheduler }); // subscribeOn: Subscription executes in asyncScheduler of(1, 2, 3).pipe( subscribeOn(asyncScheduler) ).subscribe(value => { console.log('Value:', value); // Executes in default scheduler });

Summary

Scheduler is a powerful tool in RxJS that provides:

  1. Time Control: Precisely control when tasks are executed
  2. Concurrency Management: Reasonably manage execution order of asynchronous operations
  3. Performance Optimization: Choose appropriate scheduler based on scenario
  4. Testing Support: Control timing in tests

Proper use of schedulers can significantly improve application performance and user experience. Understanding the characteristics and use cases of different schedulers is key to becoming an advanced RxJS developer.

标签:Rxjs